2023-02-11 20:32:51 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"io/ioutil"
|
|
|
|
"log"
|
|
|
|
"os"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
|
|
)
|
|
|
|
|
|
|
|
func failOnError(err error, msg string) {
|
|
|
|
if err != nil {
|
|
|
|
log.Panicf("%s: %s", msg, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func create_and_connect_to_queue(ch *amqp.Channel, name string, routing_key string) <-chan amqp.Delivery {
|
|
|
|
q, err := ch.QueueDeclare(
|
|
|
|
name, // name
|
|
|
|
false, // durable
|
|
|
|
false, // delete when unused
|
|
|
|
true, // exclusive
|
|
|
|
false, // no-wait
|
|
|
|
nil, // arguments
|
|
|
|
)
|
|
|
|
failOnError(err, "Failed to declare a queue")
|
|
|
|
|
|
|
|
err = ch.QueueBind(
|
|
|
|
q.Name, // queue name
|
|
|
|
routing_key, // routing key
|
|
|
|
"amq.topic", // exchange
|
|
|
|
false,
|
|
|
|
nil,
|
|
|
|
)
|
|
|
|
failOnError(err, "Failed to bind a queue")
|
|
|
|
|
|
|
|
msgs, err := ch.Consume(
|
|
|
|
q.Name, // queue
|
|
|
|
"", // consumer
|
|
|
|
true, // auto ack
|
|
|
|
false, // exclusive
|
|
|
|
false, // no local
|
|
|
|
false, // no wait
|
|
|
|
nil, // args
|
|
|
|
)
|
|
|
|
failOnError(err, "Failed to register a consumer")
|
|
|
|
|
|
|
|
return msgs
|
|
|
|
}
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
|
|
|
// read token from /token/read_write_token.txt file
|
|
|
|
token_file := "/tokens/read_write.txt"
|
|
|
|
influx_token, err := ioutil.ReadFile(token_file)
|
|
|
|
failOnError(err, "Failed to read token file")
|
|
|
|
fmt.Println("InfluxDB token:")
|
|
|
|
fmt.Println(strings.TrimSpace(string(influx_token)))
|
|
|
|
|
|
|
|
// Create a new client using an InfluxDB server base URL and an authentication token
|
|
|
|
client := influxdb2.NewClient(
|
|
|
|
os.Getenv("INFLUX_URL"),
|
|
|
|
strings.TrimSpace(string(influx_token)),
|
|
|
|
)
|
|
|
|
// Use blocking write client for writes to desired bucket
|
|
|
|
writeAPI := client.WriteAPIBlocking(
|
|
|
|
os.Getenv("INFLUX_ORG"),
|
|
|
|
os.Getenv("INFLUX_BUCKET"),
|
|
|
|
)
|
|
|
|
|
|
|
|
var conn *amqp.Connection
|
|
|
|
amqp_url := os.Getenv("AMQP_URL")
|
|
|
|
for {
|
|
|
|
conn, err = amqp.Dial(amqp_url)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Failed to connect to RabbitMQ: %s", err)
|
|
|
|
time.Sleep(5 * time.Second)
|
|
|
|
} else {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
ch, err := conn.Channel()
|
|
|
|
failOnError(err, "Failed to open a channel")
|
|
|
|
defer ch.Close()
|
|
|
|
|
|
|
|
err = ch.ExchangeDeclare(
|
|
|
|
"amq.topic", // name
|
|
|
|
"topic", // type
|
|
|
|
true, // durable
|
|
|
|
false, // auto-deleted
|
|
|
|
false, // internal
|
|
|
|
false, // no-wait
|
|
|
|
nil, // arguments
|
|
|
|
)
|
|
|
|
failOnError(err, "Failed to declare an exchange")
|
|
|
|
|
|
|
|
var forever chan struct{}
|
|
|
|
|
|
|
|
shelly_msgs := create_and_connect_to_queue(ch, "shelly", "shellies.#")
|
|
|
|
tasmota_msgs := create_and_connect_to_queue(ch, "tasmota", "tele.#.SENSOR")
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for d := range shelly_msgs {
|
|
|
|
log.Printf(" [x] %s: %s", d.RoutingKey, d.Body)
|
|
|
|
parts := strings.Split(d.RoutingKey, ".")
|
|
|
|
field := parts[len(parts)-1]
|
|
|
|
device := strings.Join(parts[1:len(parts)-1], ".")
|
|
|
|
|
|
|
|
// try to cast to float64
|
|
|
|
float_value, err := strconv.ParseFloat(string(d.Body), 64)
|
|
|
|
|
|
|
|
var values map[string]interface{}
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
values = map[string]interface{}{
|
|
|
|
field: float_value,
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
values = map[string]interface{}{
|
|
|
|
field: string(d.Body),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
err = writeAPI.WritePoint(
|
|
|
|
context.Background(),
|
|
|
|
influxdb2.NewPoint(
|
|
|
|
"shelly",
|
|
|
|
map[string]string{
|
|
|
|
"device": device,
|
|
|
|
},
|
|
|
|
values,
|
|
|
|
time.Now(),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Failed to write point: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for d := range tasmota_msgs {
|
|
|
|
log.Printf(" [ ] %s: %s", d.RoutingKey, d.Body)
|
|
|
|
parts := strings.Split(d.RoutingKey, ".")
|
2023-02-11 20:55:49 +00:00
|
|
|
device := strings.Join(parts[1:len(parts)-1], ".")
|
2023-02-11 20:32:51 +00:00
|
|
|
|
|
|
|
// read body as json
|
|
|
|
var values map[string]interface{}
|
|
|
|
err := json.Unmarshal(d.Body, &values)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Failed to parse json: %s", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
var sensor_data map[string]interface{} = values["AM2301"].(map[string]interface{})
|
|
|
|
|
|
|
|
// get relevant values
|
|
|
|
relevant_values := map[string]interface{}{
|
|
|
|
"Temperature": sensor_data["Temperature"],
|
|
|
|
"Humidity": sensor_data["Humidity"],
|
|
|
|
"DewPoint": sensor_data["DewPoint"],
|
|
|
|
}
|
|
|
|
|
|
|
|
err = writeAPI.WritePoint(
|
|
|
|
context.Background(),
|
|
|
|
influxdb2.NewPoint(
|
|
|
|
"tasmota",
|
|
|
|
map[string]string{
|
|
|
|
"device": device,
|
|
|
|
},
|
|
|
|
relevant_values,
|
|
|
|
time.Now(),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Failed to write point: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
|
|
|
|
<-forever
|
|
|
|
|
|
|
|
}
|