diff --git a/gatherer/main.go b/gatherer/main.go index 38ccd26..77ba3ae 100644 --- a/gatherer/main.go +++ b/gatherer/main.go @@ -16,7 +16,7 @@ func failOnError(err error, msg string) { func main() { - amqpConn := AMQPConnection{} + amqpConn := new(AMQPConnection) amqpConn.Init() err := amqpConn.Connect() defer amqpConn.Close() @@ -30,7 +30,7 @@ func main() { fmt.Println("InfluxDB token:") fmt.Println(strings.TrimSpace(string(influx_token))) - influx := InfluxWriter{} + influx := new(InfluxWriter) influx.Init( os.Getenv("INFLUX_URL"), strings.TrimSpace(string(influx_token)), @@ -53,10 +53,10 @@ func main() { var forever chan struct{} shellPipe := new(Piper) - shellPipe.Start(&influx, &amqpConn, "shelly", "shellies.#", ShellyConverter) + shellPipe.Start(influx, amqpConn, "shelly", "shellies.#", ShellyConverter) tasmotaPipe := new(Piper) - tasmotaPipe.Start(&influx, &amqpConn, "tasmota", "tele.#.SENSOR", TasmotaConverter) + tasmotaPipe.Start(influx, amqpConn, "tasmota", "tele.#.SENSOR", TasmotaConverter) log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever diff --git a/gatherer/piper.go b/gatherer/piper.go index ea308c1..71dc6ef 100644 --- a/gatherer/piper.go +++ b/gatherer/piper.go @@ -20,6 +20,7 @@ func (p *Piper) Start( // Create a new client using an InfluxDB server base URL and an authentication token p.influx = writer p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) + p.conversion = conversion notify := amqp.NotifyError() go func() {