2023-02-16 18:18:55 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
|
|
)
|
|
|
|
|
2023-02-16 19:22:28 +00:00
|
|
|
type Piper struct {
|
|
|
|
influx *InfluxWriter
|
|
|
|
delivery <-chan amqp.Delivery
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Piper) Init(writer *InfluxWriter, amqp AMQPConnection, queue_name string, routing_key string) {
|
|
|
|
// Create a new client using an InfluxDB server base URL and an authentication token
|
|
|
|
p.influx = writer
|
|
|
|
p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)
|
|
|
|
|
|
|
|
notify := amqp.NotifyError()
|
|
|
|
go func() {
|
|
|
|
for _ = range notify {
|
|
|
|
p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)
|
|
|
|
println("Reconnected to queue")
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Piper) RunPipe(
|
2023-02-16 18:18:55 +00:00
|
|
|
conversion func(amqp.Delivery) ([]*write.Point, error),
|
|
|
|
) {
|
|
|
|
// Create point using fluent style
|
2023-02-16 19:22:28 +00:00
|
|
|
for msg := range p.delivery {
|
2023-02-16 18:18:55 +00:00
|
|
|
points, _ := conversion(msg)
|
|
|
|
// Write data
|
2023-02-16 19:22:28 +00:00
|
|
|
for _, point := range points {
|
|
|
|
p.influx.Write(point)
|
2023-02-16 18:18:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|