package main import ( "log" "os" "time" amqp "github.com/rabbitmq/amqp091-go" ) type AMQPConnection struct { conn *amqp.Connection channel *amqp.Channel error_channel chan *amqp.Error observers []chan<- bool } func (amqpConn *AMQPConnection) Init() { amqpConn.error_channel = make(chan *amqp.Error) } func (amqpConn *AMQPConnection) Connect() error { amqpConn.Close() amqp_url := os.Getenv("AMQP_URL") for { conn, err := amqp.Dial(amqp_url) if err != nil { log.Printf("Failed to connect to RabbitMQ: %s", err) time.Sleep(5 * time.Second) } else { amqpConn.conn = conn break } } ch, err := amqpConn.conn.Channel() if err != nil { return err } amqpConn.channel = ch amqpConn.channel.NotifyClose(amqpConn.error_channel) go func() { for err := range amqpConn.error_channel { log.Printf("AMQP error: %s", err) amqpConn.Connect() log.Println("Reconnected to RabbitMQ") for _, observer := range amqpConn.observers { observer <- true log.Println("Notified observer") } } }() return nil } func (amqpConn *AMQPConnection) ConnectToQueue(name string, routing_key string) (<-chan amqp.Delivery, error) { q, err := amqpConn.channel.QueueDeclare( name, // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) if err != nil { return nil, err } err = amqpConn.channel.QueueBind( q.Name, // queue name routing_key, // routing key "amq.topic", // exchange false, nil, ) if err != nil { return nil, err } msgs, err := amqpConn.channel.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) if err != nil { return nil, err } return msgs, nil } func (amqpConn *AMQPConnection) NotifyError() <-chan bool { notify := make(chan bool) amqpConn.observers = append(amqpConn.observers, notify) log.Println("Added observer") return notify } func (amqpConn *AMQPConnection) Close() { if amqpConn.channel != nil { amqpConn.channel.Close() } if amqpConn.conn != nil { amqpConn.conn.Close() } amqpConn.channel = nil amqpConn.conn = nil }