refactoring 2

This commit is contained in:
Niko Abeler 2023-02-16 20:22:28 +01:00
parent 9466a78a13
commit 1bf0087d10
4 changed files with 237 additions and 95 deletions

117
gatherer/connections.go Normal file
View File

@ -0,0 +1,117 @@
package main
import (
"log"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type AMQPConnection struct {
conn *amqp.Connection
channel *amqp.Channel
error_chan chan *amqp.Error
observers []chan<- bool
}
func (amqpConn *AMQPConnection) Init() {
amqpConn.error_chan = make(chan *amqp.Error)
}
func (amqpConn *AMQPConnection) Connect() error {
amqpConn.Close()
amqp_url := os.Getenv("AMQP_URL")
for {
conn, err := amqp.Dial(amqp_url)
if err != nil {
log.Printf("Failed to connect to RabbitMQ: %s", err)
time.Sleep(5 * time.Second)
} else {
amqpConn.conn = conn
break
}
}
amqpConn.conn.NotifyClose(amqpConn.error_chan)
ch, err := amqpConn.conn.Channel()
if err != nil {
return err
}
amqpConn.channel = ch
amqpConn.channel.NotifyClose(amqpConn.error_chan)
go func() {
for err := range amqpConn.error_chan {
log.Printf("AMQP error: %s", err)
amqpConn.Connect()
log.Println("Reconnected to RabbitMQ")
for _, observer := range amqpConn.observers {
observer <- true
log.Println("Notified observer")
}
}
}()
return nil
}
func (amqpConn *AMQPConnection) ConnectToQueue(name string, routing_key string) (<-chan amqp.Delivery, error) {
q, err := amqpConn.channel.QueueDeclare(
name, // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
err = amqpConn.channel.QueueBind(
q.Name, // queue name
routing_key, // routing key
"amq.topic", // exchange
false,
nil,
)
if err != nil {
return nil, err
}
msgs, err := amqpConn.channel.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
return nil, err
}
return msgs, nil
}
func (amqpConn *AMQPConnection) NotifyError() <-chan bool {
notify := make(chan bool)
amqpConn.observers = append(amqpConn.observers, notify)
return notify
}
func (amqpConn *AMQPConnection) Close() {
if amqpConn.channel != nil {
amqpConn.channel.Close()
}
if amqpConn.conn != nil {
amqpConn.conn.Close()
}
amqpConn.channel = nil
amqpConn.conn = nil
}

40
gatherer/influx.go Normal file
View File

@ -0,0 +1,40 @@
package main
import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
type InfluxWriter struct {
client influxdb2.Client
write_api api.WriteAPIBlocking
channel chan *write.Point
}
func (w *InfluxWriter) Write(point *write.Point) {
w.channel <- point
}
func (w *InfluxWriter) Init(
url string,
token string,
org string,
bucket string,
) {
w.channel = make(chan *write.Point)
w.client = influxdb2.NewClient(url, token)
w.write_api = w.client.WriteAPIBlocking(org, bucket)
go func() {
for point := range w.channel {
err := w.write_api.WritePoint(context.Background(), point)
if err != nil {
fmt.Printf("Error writing to InfluxDB: %s", err)
}
}
}()
}

View File

@ -6,9 +6,6 @@ import (
"log" "log"
"os" "os"
"strings" "strings"
"time"
amqp "github.com/rabbitmq/amqp091-go"
) )
func failOnError(err error, msg string) { func failOnError(err error, msg string) {
@ -17,67 +14,32 @@ func failOnError(err error, msg string) {
} }
} }
func create_and_connect_to_queue(ch *amqp.Channel, name string, routing_key string) <-chan amqp.Delivery {
q, err := ch.QueueDeclare(
name, // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
routing_key, // routing key
"amq.topic", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
return msgs
}
func main() { func main() {
amqpConn := AMQPConnection{}
amqpConn.Init()
err := amqpConn.Connect()
defer amqpConn.Close()
failOnError(err, "Failed to connect to RabbitMQ")
// read token from /token/read_write_token.txt file // read token from /token/read_write_token.txt file
token_file := "/tokens/read_write.txt" token_file := "/tokens/read_write.txt"
influx_token, err := ioutil.ReadFile(token_file) token_bytes, err := ioutil.ReadFile(token_file)
failOnError(err, "Failed to read token file") failOnError(err, "Failed to read token file")
influx_token := strings.TrimSpace(string(token_bytes))
fmt.Println("InfluxDB token:") fmt.Println("InfluxDB token:")
fmt.Println(strings.TrimSpace(string(influx_token))) fmt.Println(strings.TrimSpace(string(influx_token)))
var conn *amqp.Connection influx := InfluxWriter{}
amqp_url := os.Getenv("AMQP_URL") influx.Init(
for { os.Getenv("INFLUX_URL"),
conn, err = amqp.Dial(amqp_url) strings.TrimSpace(string(influx_token)),
if err != nil { os.Getenv("INFLUX_ORG"),
log.Printf("Failed to connect to RabbitMQ: %s", err) os.Getenv("INFLUX_BUCKET"),
time.Sleep(5 * time.Second) )
} else {
break
}
}
defer conn.Close()
ch, err := conn.Channel() // create exchange
failOnError(err, "Failed to open a channel") err = amqpConn.channel.ExchangeDeclare(
defer ch.Close()
err = ch.ExchangeDeclare(
"amq.topic", // name "amq.topic", // name
"topic", // type "topic", // type
true, // durable true, // durable
@ -90,26 +52,13 @@ func main() {
var forever chan struct{} var forever chan struct{}
shelly_msgs := create_and_connect_to_queue(ch, "shelly", "shellies.#") shellPipe := Piper{}
tasmota_msgs := create_and_connect_to_queue(ch, "tasmota", "tele.#.SENSOR") shellPipe.Init(&influx, amqpConn, "shelly", "shellies.#")
go shellPipe.RunPipe(ShellyConverter)
go RunPipe( tasmotaPipe := Piper{}
shelly_msgs, tasmotaPipe.Init(&influx, amqpConn, "tasmota", "tele.#.SENSOR")
os.Getenv("INFLUX_URL"), go tasmotaPipe.RunPipe(TasmotaConverter)
strings.TrimSpace(string(influx_token)),
os.Getenv("INFLUX_ORG"),
os.Getenv("INFLUX_BUCKET"),
ShellyConverter,
)
go RunPipe(
tasmota_msgs,
os.Getenv("INFLUX_URL"),
strings.TrimSpace(string(influx_token)),
os.Getenv("INFLUX_ORG"),
os.Getenv("INFLUX_BUCKET"),
TasmotaConverter,
)
log.Printf(" [*] Waiting for logs. To exit press CTRL+C") log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever <-forever

View File

@ -1,36 +1,72 @@
package main package main
import ( import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
) )
func RunPipe( func create_and_connect_to_queue(ch *amqp.Channel, name string, routing_key string) <-chan amqp.Delivery {
channel <-chan amqp.Delivery, q, err := ch.QueueDeclare(
url string, token string, name, // name
org string, bucket string, false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
routing_key, // routing key
"amq.topic", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
return msgs
}
type Piper struct {
influx *InfluxWriter
delivery <-chan amqp.Delivery
}
func (p *Piper) Init(writer *InfluxWriter, amqp AMQPConnection, queue_name string, routing_key string) {
// Create a new client using an InfluxDB server base URL and an authentication token
p.influx = writer
p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)
notify := amqp.NotifyError()
go func() {
for _ = range notify {
p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)
println("Reconnected to queue")
}
}()
}
func (p *Piper) RunPipe(
conversion func(amqp.Delivery) ([]*write.Point, error), conversion func(amqp.Delivery) ([]*write.Point, error),
) { ) {
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient(
url, token,
)
// Get blocking write client
writeAPI := client.WriteAPIBlocking(org, bucket)
// Create point using fluent style // Create point using fluent style
for msg := range channel { for msg := range p.delivery {
points, _ := conversion(msg) points, _ := conversion(msg)
// Write data // Write data
err := writeAPI.WritePoint(context.Background(), points...) for _, point := range points {
if err != nil { p.influx.Write(point)
fmt.Printf("Write error: %s", err.Error())
// reset writeAPI
writeAPI.Flush(context.Background())
writeAPI = client.WriteAPIBlocking(org, bucket)
} }
} }