home_data/gatherer/piper.go

48 lines
1.0 KiB
Go
Raw Normal View History

2023-02-16 18:18:55 +00:00
package main
import (
"github.com/influxdata/influxdb-client-go/v2/api/write"
amqp "github.com/rabbitmq/amqp091-go"
)
2023-02-16 19:22:28 +00:00
type Piper struct {
2023-02-16 20:06:24 +00:00
influx *InfluxWriter
delivery <-chan amqp.Delivery
conversion func(amqp.Delivery) ([]*write.Point, error)
2023-02-16 19:22:28 +00:00
}
2023-02-16 20:06:24 +00:00
func (p *Piper) Start(
writer *InfluxWriter,
amqp *AMQPConnection,
queue_name string, routing_key string,
conversion func(amqp.Delivery) ([]*write.Point, error),
) {
2023-02-16 19:22:28 +00:00
// Create a new client using an InfluxDB server base URL and an authentication token
p.influx = writer
p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)
2023-02-16 20:14:50 +00:00
p.conversion = conversion
2023-02-16 19:22:28 +00:00
notify := amqp.NotifyError()
go func() {
for _ = range notify {
p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)
println("Reconnected to queue")
2023-02-16 20:06:24 +00:00
go p.run_pipe()
2023-02-16 19:22:28 +00:00
}
}()
2023-02-16 20:06:24 +00:00
go p.run_pipe()
2023-02-16 19:22:28 +00:00
}
2023-02-16 20:06:24 +00:00
func (p *Piper) run_pipe() {
2023-02-16 18:18:55 +00:00
// Create point using fluent style
2023-02-16 19:22:28 +00:00
for msg := range p.delivery {
2023-02-16 20:06:24 +00:00
points, _ := p.conversion(msg)
2023-02-16 18:18:55 +00:00
// Write data
2023-02-16 19:22:28 +00:00
for _, point := range points {
p.influx.Write(point)
2023-02-16 18:18:55 +00:00
}
}
}