diff --git a/gatherer/main.go b/gatherer/main.go index 9366c52..3a9e12d 100644 --- a/gatherer/main.go +++ b/gatherer/main.go @@ -53,12 +53,10 @@ func main() { var forever chan struct{} shellPipe := Piper{} - shellPipe.Init(&influx, &amqpConn, "shelly", "shellies.#") - go shellPipe.RunPipe(ShellyConverter) + shellPipe.Start(&influx, &amqpConn, "shelly", "shellies.#", ShellyConverter) tasmotaPipe := Piper{} - tasmotaPipe.Init(&influx, &amqpConn, "tasmota", "tele.#.SENSOR") - go tasmotaPipe.RunPipe(TasmotaConverter) + tasmotaPipe.Start(&influx, &amqpConn, "tasmota", "tele.#.SENSOR", TasmotaConverter) log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever diff --git a/gatherer/piper.go b/gatherer/piper.go index d560141..ea308c1 100644 --- a/gatherer/piper.go +++ b/gatherer/piper.go @@ -6,11 +6,17 @@ import ( ) type Piper struct { - influx *InfluxWriter - delivery <-chan amqp.Delivery + influx *InfluxWriter + delivery <-chan amqp.Delivery + conversion func(amqp.Delivery) ([]*write.Point, error) } -func (p *Piper) Init(writer *InfluxWriter, amqp *AMQPConnection, queue_name string, routing_key string) { +func (p *Piper) Start( + writer *InfluxWriter, + amqp *AMQPConnection, + queue_name string, routing_key string, + conversion func(amqp.Delivery) ([]*write.Point, error), +) { // Create a new client using an InfluxDB server base URL and an authentication token p.influx = writer p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) @@ -20,16 +26,17 @@ func (p *Piper) Init(writer *InfluxWriter, amqp *AMQPConnection, queue_name stri for _ = range notify { p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) println("Reconnected to queue") + go p.run_pipe() } }() + + go p.run_pipe() } -func (p *Piper) RunPipe( - conversion func(amqp.Delivery) ([]*write.Point, error), -) { +func (p *Piper) run_pipe() { // Create point using fluent style for msg := range p.delivery { - points, _ := conversion(msg) + points, _ := p.conversion(msg) // Write data for _, point := range points { p.influx.Write(point)