From da634efa8c02c1b4d4fde1b7f127871665faf9e4 Mon Sep 17 00:00:00 2001 From: Niko Abeler Date: Thu, 16 Feb 2023 21:01:30 +0100 Subject: [PATCH] by ref --- gatherer/connections.go | 1 + gatherer/main.go | 4 ++-- gatherer/piper.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/gatherer/connections.go b/gatherer/connections.go index 868c5ce..ff2d944 100644 --- a/gatherer/connections.go +++ b/gatherer/connections.go @@ -99,6 +99,7 @@ func (amqpConn *AMQPConnection) ConnectToQueue(name string, routing_key string) func (amqpConn *AMQPConnection) NotifyError() <-chan bool { notify := make(chan bool) amqpConn.observers = append(amqpConn.observers, notify) + log.Println("Added observer") return notify } diff --git a/gatherer/main.go b/gatherer/main.go index 341a493..9366c52 100644 --- a/gatherer/main.go +++ b/gatherer/main.go @@ -53,11 +53,11 @@ func main() { var forever chan struct{} shellPipe := Piper{} - shellPipe.Init(&influx, amqpConn, "shelly", "shellies.#") + shellPipe.Init(&influx, &amqpConn, "shelly", "shellies.#") go shellPipe.RunPipe(ShellyConverter) tasmotaPipe := Piper{} - tasmotaPipe.Init(&influx, amqpConn, "tasmota", "tele.#.SENSOR") + tasmotaPipe.Init(&influx, &amqpConn, "tasmota", "tele.#.SENSOR") go tasmotaPipe.RunPipe(TasmotaConverter) log.Printf(" [*] Waiting for logs. To exit press CTRL+C") diff --git a/gatherer/piper.go b/gatherer/piper.go index 0baca0b..d560141 100644 --- a/gatherer/piper.go +++ b/gatherer/piper.go @@ -10,7 +10,7 @@ type Piper struct { delivery <-chan amqp.Delivery } -func (p *Piper) Init(writer *InfluxWriter, amqp AMQPConnection, queue_name string, routing_key string) { +func (p *Piper) Init(writer *InfluxWriter, amqp *AMQPConnection, queue_name string, routing_key string) { // Create a new client using an InfluxDB server base URL and an authentication token p.influx = writer p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)