From 1bf0087d107790a9693f59a32c1115bde6562483 Mon Sep 17 00:00:00 2001 From: Niko Abeler Date: Thu, 16 Feb 2023 20:22:28 +0100 Subject: [PATCH] refactoring 2 --- gatherer/connections.go | 117 ++++++++++++++++++++++++++++++++++++++++ gatherer/influx.go | 40 ++++++++++++++ gatherer/main.go | 97 ++++++++------------------------- gatherer/piper.go | 78 +++++++++++++++++++-------- 4 files changed, 237 insertions(+), 95 deletions(-) create mode 100644 gatherer/connections.go create mode 100644 gatherer/influx.go diff --git a/gatherer/connections.go b/gatherer/connections.go new file mode 100644 index 0000000..eb5dba8 --- /dev/null +++ b/gatherer/connections.go @@ -0,0 +1,117 @@ +package main + +import ( + "log" + "os" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type AMQPConnection struct { + conn *amqp.Connection + channel *amqp.Channel + error_chan chan *amqp.Error + observers []chan<- bool +} + +func (amqpConn *AMQPConnection) Init() { + amqpConn.error_chan = make(chan *amqp.Error) +} + +func (amqpConn *AMQPConnection) Connect() error { + amqpConn.Close() + + amqp_url := os.Getenv("AMQP_URL") + for { + conn, err := amqp.Dial(amqp_url) + if err != nil { + log.Printf("Failed to connect to RabbitMQ: %s", err) + time.Sleep(5 * time.Second) + } else { + amqpConn.conn = conn + break + } + } + amqpConn.conn.NotifyClose(amqpConn.error_chan) + + ch, err := amqpConn.conn.Channel() + if err != nil { + return err + } + amqpConn.channel = ch + amqpConn.channel.NotifyClose(amqpConn.error_chan) + + go func() { + for err := range amqpConn.error_chan { + log.Printf("AMQP error: %s", err) + amqpConn.Connect() + log.Println("Reconnected to RabbitMQ") + for _, observer := range amqpConn.observers { + observer <- true + log.Println("Notified observer") + } + } + }() + + return nil +} + +func (amqpConn *AMQPConnection) ConnectToQueue(name string, routing_key string) (<-chan amqp.Delivery, error) { + q, err := amqpConn.channel.QueueDeclare( + name, // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, err + } + + err = amqpConn.channel.QueueBind( + q.Name, // queue name + routing_key, // routing key + "amq.topic", // exchange + false, + nil, + ) + if err != nil { + return nil, err + } + + msgs, err := amqpConn.channel.Consume( + q.Name, // queue + "", // consumer + true, // auto ack + false, // exclusive + false, // no local + false, // no wait + nil, // args + ) + if err != nil { + return nil, err + } + + return msgs, nil +} + +func (amqpConn *AMQPConnection) NotifyError() <-chan bool { + notify := make(chan bool) + amqpConn.observers = append(amqpConn.observers, notify) + return notify +} + +func (amqpConn *AMQPConnection) Close() { + if amqpConn.channel != nil { + amqpConn.channel.Close() + } + + if amqpConn.conn != nil { + amqpConn.conn.Close() + } + + amqpConn.channel = nil + amqpConn.conn = nil +} diff --git a/gatherer/influx.go b/gatherer/influx.go new file mode 100644 index 0000000..fdefa45 --- /dev/null +++ b/gatherer/influx.go @@ -0,0 +1,40 @@ +package main + +import ( + "context" + "fmt" + + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" +) + +type InfluxWriter struct { + client influxdb2.Client + write_api api.WriteAPIBlocking + channel chan *write.Point +} + +func (w *InfluxWriter) Write(point *write.Point) { + w.channel <- point +} + +func (w *InfluxWriter) Init( + url string, + token string, + org string, + bucket string, +) { + w.channel = make(chan *write.Point) + w.client = influxdb2.NewClient(url, token) + w.write_api = w.client.WriteAPIBlocking(org, bucket) + + go func() { + for point := range w.channel { + err := w.write_api.WritePoint(context.Background(), point) + if err != nil { + fmt.Printf("Error writing to InfluxDB: %s", err) + } + } + }() +} diff --git a/gatherer/main.go b/gatherer/main.go index f0f02bc..341a493 100644 --- a/gatherer/main.go +++ b/gatherer/main.go @@ -6,9 +6,6 @@ import ( "log" "os" "strings" - "time" - - amqp "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { @@ -17,67 +14,32 @@ func failOnError(err error, msg string) { } } -func create_and_connect_to_queue(ch *amqp.Channel, name string, routing_key string) <-chan amqp.Delivery { - q, err := ch.QueueDeclare( - name, // name - false, // durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) - failOnError(err, "Failed to declare a queue") - - err = ch.QueueBind( - q.Name, // queue name - routing_key, // routing key - "amq.topic", // exchange - false, - nil, - ) - failOnError(err, "Failed to bind a queue") - - msgs, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto ack - false, // exclusive - false, // no local - false, // no wait - nil, // args - ) - failOnError(err, "Failed to register a consumer") - - return msgs -} - func main() { + amqpConn := AMQPConnection{} + amqpConn.Init() + err := amqpConn.Connect() + defer amqpConn.Close() + failOnError(err, "Failed to connect to RabbitMQ") + // read token from /token/read_write_token.txt file token_file := "/tokens/read_write.txt" - influx_token, err := ioutil.ReadFile(token_file) + token_bytes, err := ioutil.ReadFile(token_file) failOnError(err, "Failed to read token file") + influx_token := strings.TrimSpace(string(token_bytes)) fmt.Println("InfluxDB token:") fmt.Println(strings.TrimSpace(string(influx_token))) - var conn *amqp.Connection - amqp_url := os.Getenv("AMQP_URL") - for { - conn, err = amqp.Dial(amqp_url) - if err != nil { - log.Printf("Failed to connect to RabbitMQ: %s", err) - time.Sleep(5 * time.Second) - } else { - break - } - } - defer conn.Close() + influx := InfluxWriter{} + influx.Init( + os.Getenv("INFLUX_URL"), + strings.TrimSpace(string(influx_token)), + os.Getenv("INFLUX_ORG"), + os.Getenv("INFLUX_BUCKET"), + ) - ch, err := conn.Channel() - failOnError(err, "Failed to open a channel") - defer ch.Close() - - err = ch.ExchangeDeclare( + // create exchange + err = amqpConn.channel.ExchangeDeclare( "amq.topic", // name "topic", // type true, // durable @@ -90,26 +52,13 @@ func main() { var forever chan struct{} - shelly_msgs := create_and_connect_to_queue(ch, "shelly", "shellies.#") - tasmota_msgs := create_and_connect_to_queue(ch, "tasmota", "tele.#.SENSOR") + shellPipe := Piper{} + shellPipe.Init(&influx, amqpConn, "shelly", "shellies.#") + go shellPipe.RunPipe(ShellyConverter) - go RunPipe( - shelly_msgs, - os.Getenv("INFLUX_URL"), - strings.TrimSpace(string(influx_token)), - os.Getenv("INFLUX_ORG"), - os.Getenv("INFLUX_BUCKET"), - ShellyConverter, - ) - - go RunPipe( - tasmota_msgs, - os.Getenv("INFLUX_URL"), - strings.TrimSpace(string(influx_token)), - os.Getenv("INFLUX_ORG"), - os.Getenv("INFLUX_BUCKET"), - TasmotaConverter, - ) + tasmotaPipe := Piper{} + tasmotaPipe.Init(&influx, amqpConn, "tasmota", "tele.#.SENSOR") + go tasmotaPipe.RunPipe(TasmotaConverter) log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever diff --git a/gatherer/piper.go b/gatherer/piper.go index 729c745..aa9e7ba 100644 --- a/gatherer/piper.go +++ b/gatherer/piper.go @@ -1,36 +1,72 @@ package main import ( - "context" - "fmt" - - influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api/write" amqp "github.com/rabbitmq/amqp091-go" ) -func RunPipe( - channel <-chan amqp.Delivery, - url string, token string, - org string, bucket string, +func create_and_connect_to_queue(ch *amqp.Channel, name string, routing_key string) <-chan amqp.Delivery { + q, err := ch.QueueDeclare( + name, // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + err = ch.QueueBind( + q.Name, // queue name + routing_key, // routing key + "amq.topic", // exchange + false, + nil, + ) + failOnError(err, "Failed to bind a queue") + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto ack + false, // exclusive + false, // no local + false, // no wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + return msgs +} + +type Piper struct { + influx *InfluxWriter + delivery <-chan amqp.Delivery +} + +func (p *Piper) Init(writer *InfluxWriter, amqp AMQPConnection, queue_name string, routing_key string) { + // Create a new client using an InfluxDB server base URL and an authentication token + p.influx = writer + p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) + + notify := amqp.NotifyError() + go func() { + for _ = range notify { + p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) + println("Reconnected to queue") + } + }() +} + +func (p *Piper) RunPipe( conversion func(amqp.Delivery) ([]*write.Point, error), ) { - // Create a new client using an InfluxDB server base URL and an authentication token - client := influxdb2.NewClient( - url, token, - ) - // Get blocking write client - writeAPI := client.WriteAPIBlocking(org, bucket) // Create point using fluent style - for msg := range channel { + for msg := range p.delivery { points, _ := conversion(msg) // Write data - err := writeAPI.WritePoint(context.Background(), points...) - if err != nil { - fmt.Printf("Write error: %s", err.Error()) - // reset writeAPI - writeAPI.Flush(context.Background()) - writeAPI = client.WriteAPIBlocking(org, bucket) + for _, point := range points { + p.influx.Write(point) } }