diff --git a/gatherer/connections.go b/gatherer/connections.go index eb5dba8..868c5ce 100644 --- a/gatherer/connections.go +++ b/gatherer/connections.go @@ -9,14 +9,14 @@ import ( ) type AMQPConnection struct { - conn *amqp.Connection - channel *amqp.Channel - error_chan chan *amqp.Error - observers []chan<- bool + conn *amqp.Connection + channel *amqp.Channel + error_channel chan *amqp.Error + observers []chan<- bool } func (amqpConn *AMQPConnection) Init() { - amqpConn.error_chan = make(chan *amqp.Error) + amqpConn.error_channel = make(chan *amqp.Error) } func (amqpConn *AMQPConnection) Connect() error { @@ -33,17 +33,16 @@ func (amqpConn *AMQPConnection) Connect() error { break } } - amqpConn.conn.NotifyClose(amqpConn.error_chan) ch, err := amqpConn.conn.Channel() if err != nil { return err } amqpConn.channel = ch - amqpConn.channel.NotifyClose(amqpConn.error_chan) + amqpConn.channel.NotifyClose(amqpConn.error_channel) go func() { - for err := range amqpConn.error_chan { + for err := range amqpConn.error_channel { log.Printf("AMQP error: %s", err) amqpConn.Connect() log.Println("Reconnected to RabbitMQ") diff --git a/gatherer/piper.go b/gatherer/piper.go index aa9e7ba..0baca0b 100644 --- a/gatherer/piper.go +++ b/gatherer/piper.go @@ -5,40 +5,6 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) -func create_and_connect_to_queue(ch *amqp.Channel, name string, routing_key string) <-chan amqp.Delivery { - q, err := ch.QueueDeclare( - name, // name - false, // durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) - failOnError(err, "Failed to declare a queue") - - err = ch.QueueBind( - q.Name, // queue name - routing_key, // routing key - "amq.topic", // exchange - false, - nil, - ) - failOnError(err, "Failed to bind a queue") - - msgs, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto ack - false, // exclusive - false, // no local - false, // no wait - nil, // args - ) - failOnError(err, "Failed to register a consumer") - - return msgs -} - type Piper struct { influx *InfluxWriter delivery <-chan amqp.Delivery