package main import ( "github.com/influxdata/influxdb-client-go/v2/api/write" amqp "github.com/rabbitmq/amqp091-go" ) type Piper struct { influx *InfluxWriter delivery <-chan amqp.Delivery conversion func(amqp.Delivery) ([]*write.Point, error) } func (p *Piper) Start( writer *InfluxWriter, amqp *AMQPConnection, queue_name string, routing_key string, conversion func(amqp.Delivery) ([]*write.Point, error), ) { // Create a new client using an InfluxDB server base URL and an authentication token p.influx = writer p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) p.conversion = conversion notify := amqp.NotifyError() go func() { for _ = range notify { p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) println("Reconnected to queue") go p.run_pipe() } }() go p.run_pipe() } func (p *Piper) run_pipe() { // Create point using fluent style for msg := range p.delivery { points, _ := p.conversion(msg) // Write data for _, point := range points { p.influx.Write(point) } } }