package main import ( "context" "encoding/json" "fmt" "io/ioutil" "log" "os" "strconv" "strings" "time" influxdb2 "github.com/influxdata/influxdb-client-go/v2" amqp "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func create_and_connect_to_queue(ch *amqp.Channel, name string, routing_key string) <-chan amqp.Delivery { q, err := ch.QueueDeclare( name, // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.QueueBind( q.Name, // queue name routing_key, // routing key "amq.topic", // exchange false, nil, ) failOnError(err, "Failed to bind a queue") msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") return msgs } func main() { // read token from /token/read_write_token.txt file token_file := "/tokens/read_write.txt" influx_token, err := ioutil.ReadFile(token_file) failOnError(err, "Failed to read token file") fmt.Println("InfluxDB token:") fmt.Println(strings.TrimSpace(string(influx_token))) // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient( os.Getenv("INFLUX_URL"), strings.TrimSpace(string(influx_token)), ) // Use blocking write client for writes to desired bucket writeAPI := client.WriteAPIBlocking( os.Getenv("INFLUX_ORG"), os.Getenv("INFLUX_BUCKET"), ) var conn *amqp.Connection amqp_url := os.Getenv("AMQP_URL") for { conn, err = amqp.Dial(amqp_url) if err != nil { log.Printf("Failed to connect to RabbitMQ: %s", err) time.Sleep(5 * time.Second) } else { break } } defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "amq.topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") var forever chan struct{} shelly_msgs := create_and_connect_to_queue(ch, "shelly", "shellies.#") tasmota_msgs := create_and_connect_to_queue(ch, "tasmota", "tele.#.SENSOR") go func() { for d := range shelly_msgs { log.Printf(" [x] %s: %s", d.RoutingKey, d.Body) parts := strings.Split(d.RoutingKey, ".") field := parts[len(parts)-1] device := strings.Join(parts[1:len(parts)-1], ".") // try to cast to float64 float_value, err := strconv.ParseFloat(string(d.Body), 64) var values map[string]interface{} if err == nil { values = map[string]interface{}{ field: float_value, } } else { values = map[string]interface{}{ field: string(d.Body), } } err = writeAPI.WritePoint( context.Background(), influxdb2.NewPoint( "shelly", map[string]string{ "device": device, }, values, time.Now(), ), ) if err != nil { log.Printf("Failed to write point: %s", err) } } }() go func() { for d := range tasmota_msgs { log.Printf(" [ ] %s: %s", d.RoutingKey, d.Body) parts := strings.Split(d.RoutingKey, ".") device := strings.Join(parts[1:len(parts)-2], ".") // read body as json var values map[string]interface{} err := json.Unmarshal(d.Body, &values) if err != nil { log.Printf("Failed to parse json: %s", err) continue } var sensor_data map[string]interface{} = values["AM2301"].(map[string]interface{}) // get relevant values relevant_values := map[string]interface{}{ "Temperature": sensor_data["Temperature"], "Humidity": sensor_data["Humidity"], "DewPoint": sensor_data["DewPoint"], } err = writeAPI.WritePoint( context.Background(), influxdb2.NewPoint( "tasmota", map[string]string{ "device": device, }, relevant_values, time.Now(), ), ) if err != nil { log.Printf("Failed to write point: %s", err) } } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }