diff --git a/gatherer/converter.go b/gatherer/converter.go new file mode 100644 index 0000000..6b025ba --- /dev/null +++ b/gatherer/converter.go @@ -0,0 +1,74 @@ +package main + +import ( + "encoding/json" + "log" + "strconv" + "strings" + "time" + + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api/write" + amqp "github.com/rabbitmq/amqp091-go" +) + +func ShellyConverter(d amqp.Delivery) ([]*write.Point, error) { + log.Printf(" [x] %s: %s", d.RoutingKey, d.Body) + parts := strings.Split(d.RoutingKey, ".") + field := parts[len(parts)-1] + device := strings.Join(parts[1:len(parts)-1], ".") + + // try to cast to float64 + float_value, err := strconv.ParseFloat(string(d.Body), 64) + + var values map[string]interface{} + + if err == nil { + values = map[string]interface{}{ + field: float_value, + } + } else { + values = map[string]interface{}{ + field: string(d.Body), + } + } + + return []*write.Point{influxdb2.NewPoint( + "shelly", + map[string]string{ + "device": device, + }, + values, + time.Now(), + )}, nil +} + +func TasmotaConverter(d amqp.Delivery) ([]*write.Point, error) { + log.Printf(" [ ] %s: %s", d.RoutingKey, d.Body) + parts := strings.Split(d.RoutingKey, ".") + device := strings.Join(parts[1:len(parts)-1], ".") + + // read body as json + var values map[string]interface{} + err := json.Unmarshal(d.Body, &values) + if err != nil { + return nil, err + } + var sensor_data map[string]interface{} = values["AM2301"].(map[string]interface{}) + + // get relevant values + relevant_values := map[string]interface{}{ + "Temperature": sensor_data["Temperature"], + "Humidity": sensor_data["Humidity"], + "DewPoint": sensor_data["DewPoint"], + } + + return []*write.Point{influxdb2.NewPoint( + "tasmota", + map[string]string{ + "device": device, + }, + relevant_values, + time.Now(), + )}, nil +} diff --git a/gatherer/main.go b/gatherer/main.go index 9cbc62f..f0f02bc 100644 --- a/gatherer/main.go +++ b/gatherer/main.go @@ -1,17 +1,13 @@ package main import ( - "context" - "encoding/json" "fmt" "io/ioutil" "log" "os" - "strconv" "strings" "time" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" amqp "github.com/rabbitmq/amqp091-go" ) @@ -64,17 +60,6 @@ func main() { fmt.Println("InfluxDB token:") fmt.Println(strings.TrimSpace(string(influx_token))) - // Create a new client using an InfluxDB server base URL and an authentication token - client := influxdb2.NewClient( - os.Getenv("INFLUX_URL"), - strings.TrimSpace(string(influx_token)), - ) - // Use blocking write client for writes to desired bucket - writeAPI := client.WriteAPIBlocking( - os.Getenv("INFLUX_ORG"), - os.Getenv("INFLUX_BUCKET"), - ) - var conn *amqp.Connection amqp_url := os.Getenv("AMQP_URL") for { @@ -108,84 +93,23 @@ func main() { shelly_msgs := create_and_connect_to_queue(ch, "shelly", "shellies.#") tasmota_msgs := create_and_connect_to_queue(ch, "tasmota", "tele.#.SENSOR") - go func() { - for d := range shelly_msgs { - log.Printf(" [x] %s: %s", d.RoutingKey, d.Body) - parts := strings.Split(d.RoutingKey, ".") - field := parts[len(parts)-1] - device := strings.Join(parts[1:len(parts)-1], ".") + go RunPipe( + shelly_msgs, + os.Getenv("INFLUX_URL"), + strings.TrimSpace(string(influx_token)), + os.Getenv("INFLUX_ORG"), + os.Getenv("INFLUX_BUCKET"), + ShellyConverter, + ) - // try to cast to float64 - float_value, err := strconv.ParseFloat(string(d.Body), 64) - - var values map[string]interface{} - - if err == nil { - values = map[string]interface{}{ - field: float_value, - } - } else { - values = map[string]interface{}{ - field: string(d.Body), - } - } - - err = writeAPI.WritePoint( - context.Background(), - influxdb2.NewPoint( - "shelly", - map[string]string{ - "device": device, - }, - values, - time.Now(), - ), - ) - if err != nil { - log.Printf("Failed to write point: %s", err) - } - } - }() - - go func() { - for d := range tasmota_msgs { - log.Printf(" [ ] %s: %s", d.RoutingKey, d.Body) - parts := strings.Split(d.RoutingKey, ".") - device := strings.Join(parts[1:len(parts)-1], ".") - - // read body as json - var values map[string]interface{} - err := json.Unmarshal(d.Body, &values) - if err != nil { - log.Printf("Failed to parse json: %s", err) - continue - } - var sensor_data map[string]interface{} = values["AM2301"].(map[string]interface{}) - - // get relevant values - relevant_values := map[string]interface{}{ - "Temperature": sensor_data["Temperature"], - "Humidity": sensor_data["Humidity"], - "DewPoint": sensor_data["DewPoint"], - } - - err = writeAPI.WritePoint( - context.Background(), - influxdb2.NewPoint( - "tasmota", - map[string]string{ - "device": device, - }, - relevant_values, - time.Now(), - ), - ) - if err != nil { - log.Printf("Failed to write point: %s", err) - } - - } - }() + go RunPipe( + tasmota_msgs, + os.Getenv("INFLUX_URL"), + strings.TrimSpace(string(influx_token)), + os.Getenv("INFLUX_ORG"), + os.Getenv("INFLUX_BUCKET"), + TasmotaConverter, + ) log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever diff --git a/gatherer/piper.go b/gatherer/piper.go new file mode 100644 index 0000000..729c745 --- /dev/null +++ b/gatherer/piper.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "fmt" + + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api/write" + amqp "github.com/rabbitmq/amqp091-go" +) + +func RunPipe( + channel <-chan amqp.Delivery, + url string, token string, + org string, bucket string, + conversion func(amqp.Delivery) ([]*write.Point, error), +) { + // Create a new client using an InfluxDB server base URL and an authentication token + client := influxdb2.NewClient( + url, token, + ) + // Get blocking write client + writeAPI := client.WriteAPIBlocking(org, bucket) + // Create point using fluent style + for msg := range channel { + points, _ := conversion(msg) + // Write data + err := writeAPI.WritePoint(context.Background(), points...) + if err != nil { + fmt.Printf("Write error: %s", err.Error()) + // reset writeAPI + writeAPI.Flush(context.Background()) + writeAPI = client.WriteAPIBlocking(org, bucket) + } + } + +}