run pipe on new queue

This commit is contained in:
Niko Abeler 2023-02-16 21:06:24 +01:00
parent da634efa8c
commit e09096e8e3
2 changed files with 16 additions and 11 deletions

View File

@ -53,12 +53,10 @@ func main() {
var forever chan struct{} var forever chan struct{}
shellPipe := Piper{} shellPipe := Piper{}
shellPipe.Init(&influx, &amqpConn, "shelly", "shellies.#") shellPipe.Start(&influx, &amqpConn, "shelly", "shellies.#", ShellyConverter)
go shellPipe.RunPipe(ShellyConverter)
tasmotaPipe := Piper{} tasmotaPipe := Piper{}
tasmotaPipe.Init(&influx, &amqpConn, "tasmota", "tele.#.SENSOR") tasmotaPipe.Start(&influx, &amqpConn, "tasmota", "tele.#.SENSOR", TasmotaConverter)
go tasmotaPipe.RunPipe(TasmotaConverter)
log.Printf(" [*] Waiting for logs. To exit press CTRL+C") log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever <-forever

View File

@ -6,11 +6,17 @@ import (
) )
type Piper struct { type Piper struct {
influx *InfluxWriter influx *InfluxWriter
delivery <-chan amqp.Delivery delivery <-chan amqp.Delivery
conversion func(amqp.Delivery) ([]*write.Point, error)
} }
func (p *Piper) Init(writer *InfluxWriter, amqp *AMQPConnection, queue_name string, routing_key string) { func (p *Piper) Start(
writer *InfluxWriter,
amqp *AMQPConnection,
queue_name string, routing_key string,
conversion func(amqp.Delivery) ([]*write.Point, error),
) {
// Create a new client using an InfluxDB server base URL and an authentication token // Create a new client using an InfluxDB server base URL and an authentication token
p.influx = writer p.influx = writer
p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)
@ -20,16 +26,17 @@ func (p *Piper) Init(writer *InfluxWriter, amqp *AMQPConnection, queue_name stri
for _ = range notify { for _ = range notify {
p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key) p.delivery, _ = amqp.ConnectToQueue(queue_name, routing_key)
println("Reconnected to queue") println("Reconnected to queue")
go p.run_pipe()
} }
}() }()
go p.run_pipe()
} }
func (p *Piper) RunPipe( func (p *Piper) run_pipe() {
conversion func(amqp.Delivery) ([]*write.Point, error),
) {
// Create point using fluent style // Create point using fluent style
for msg := range p.delivery { for msg := range p.delivery {
points, _ := conversion(msg) points, _ := p.conversion(msg)
// Write data // Write data
for _, point := range points { for _, point := range points {
p.influx.Write(point) p.influx.Write(point)