refactor 1

This commit is contained in:
Niko Abeler 2023-02-16 19:18:55 +01:00
parent d3930bef26
commit 9466a78a13
3 changed files with 127 additions and 92 deletions

74
gatherer/converter.go Normal file
View File

@ -0,0 +1,74 @@
package main
import (
"encoding/json"
"log"
"strconv"
"strings"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
amqp "github.com/rabbitmq/amqp091-go"
)
func ShellyConverter(d amqp.Delivery) ([]*write.Point, error) {
log.Printf(" [x] %s: %s", d.RoutingKey, d.Body)
parts := strings.Split(d.RoutingKey, ".")
field := parts[len(parts)-1]
device := strings.Join(parts[1:len(parts)-1], ".")
// try to cast to float64
float_value, err := strconv.ParseFloat(string(d.Body), 64)
var values map[string]interface{}
if err == nil {
values = map[string]interface{}{
field: float_value,
}
} else {
values = map[string]interface{}{
field: string(d.Body),
}
}
return []*write.Point{influxdb2.NewPoint(
"shelly",
map[string]string{
"device": device,
},
values,
time.Now(),
)}, nil
}
func TasmotaConverter(d amqp.Delivery) ([]*write.Point, error) {
log.Printf(" [ ] %s: %s", d.RoutingKey, d.Body)
parts := strings.Split(d.RoutingKey, ".")
device := strings.Join(parts[1:len(parts)-1], ".")
// read body as json
var values map[string]interface{}
err := json.Unmarshal(d.Body, &values)
if err != nil {
return nil, err
}
var sensor_data map[string]interface{} = values["AM2301"].(map[string]interface{})
// get relevant values
relevant_values := map[string]interface{}{
"Temperature": sensor_data["Temperature"],
"Humidity": sensor_data["Humidity"],
"DewPoint": sensor_data["DewPoint"],
}
return []*write.Point{influxdb2.NewPoint(
"tasmota",
map[string]string{
"device": device,
},
relevant_values,
time.Now(),
)}, nil
}

View File

@ -1,17 +1,13 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
amqp "github.com/rabbitmq/amqp091-go"
)
@ -64,17 +60,6 @@ func main() {
fmt.Println("InfluxDB token:")
fmt.Println(strings.TrimSpace(string(influx_token)))
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient(
os.Getenv("INFLUX_URL"),
strings.TrimSpace(string(influx_token)),
)
// Use blocking write client for writes to desired bucket
writeAPI := client.WriteAPIBlocking(
os.Getenv("INFLUX_ORG"),
os.Getenv("INFLUX_BUCKET"),
)
var conn *amqp.Connection
amqp_url := os.Getenv("AMQP_URL")
for {
@ -108,84 +93,23 @@ func main() {
shelly_msgs := create_and_connect_to_queue(ch, "shelly", "shellies.#")
tasmota_msgs := create_and_connect_to_queue(ch, "tasmota", "tele.#.SENSOR")
go func() {
for d := range shelly_msgs {
log.Printf(" [x] %s: %s", d.RoutingKey, d.Body)
parts := strings.Split(d.RoutingKey, ".")
field := parts[len(parts)-1]
device := strings.Join(parts[1:len(parts)-1], ".")
// try to cast to float64
float_value, err := strconv.ParseFloat(string(d.Body), 64)
var values map[string]interface{}
if err == nil {
values = map[string]interface{}{
field: float_value,
}
} else {
values = map[string]interface{}{
field: string(d.Body),
}
}
err = writeAPI.WritePoint(
context.Background(),
influxdb2.NewPoint(
"shelly",
map[string]string{
"device": device,
},
values,
time.Now(),
),
go RunPipe(
shelly_msgs,
os.Getenv("INFLUX_URL"),
strings.TrimSpace(string(influx_token)),
os.Getenv("INFLUX_ORG"),
os.Getenv("INFLUX_BUCKET"),
ShellyConverter,
)
if err != nil {
log.Printf("Failed to write point: %s", err)
}
}
}()
go func() {
for d := range tasmota_msgs {
log.Printf(" [ ] %s: %s", d.RoutingKey, d.Body)
parts := strings.Split(d.RoutingKey, ".")
device := strings.Join(parts[1:len(parts)-1], ".")
// read body as json
var values map[string]interface{}
err := json.Unmarshal(d.Body, &values)
if err != nil {
log.Printf("Failed to parse json: %s", err)
continue
}
var sensor_data map[string]interface{} = values["AM2301"].(map[string]interface{})
// get relevant values
relevant_values := map[string]interface{}{
"Temperature": sensor_data["Temperature"],
"Humidity": sensor_data["Humidity"],
"DewPoint": sensor_data["DewPoint"],
}
err = writeAPI.WritePoint(
context.Background(),
influxdb2.NewPoint(
"tasmota",
map[string]string{
"device": device,
},
relevant_values,
time.Now(),
),
go RunPipe(
tasmota_msgs,
os.Getenv("INFLUX_URL"),
strings.TrimSpace(string(influx_token)),
os.Getenv("INFLUX_ORG"),
os.Getenv("INFLUX_BUCKET"),
TasmotaConverter,
)
if err != nil {
log.Printf("Failed to write point: %s", err)
}
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever

37
gatherer/piper.go Normal file
View File

@ -0,0 +1,37 @@
package main
import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
amqp "github.com/rabbitmq/amqp091-go"
)
func RunPipe(
channel <-chan amqp.Delivery,
url string, token string,
org string, bucket string,
conversion func(amqp.Delivery) ([]*write.Point, error),
) {
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient(
url, token,
)
// Get blocking write client
writeAPI := client.WriteAPIBlocking(org, bucket)
// Create point using fluent style
for msg := range channel {
points, _ := conversion(msg)
// Write data
err := writeAPI.WritePoint(context.Background(), points...)
if err != nil {
fmt.Printf("Write error: %s", err.Error())
// reset writeAPI
writeAPI.Flush(context.Background())
writeAPI = client.WriteAPIBlocking(org, bucket)
}
}
}