Learn the coding skills for your next job

Using a Low-Level RabbitMQ Client in Golang

RabbitMQ is a great message broker with awesome Golang support. It’s a great pub-sub system, and pub-sub has become a staple communication architecture in micro-services. At my current day job, we use RabbitMQ to push hundreds of millions of social media posts through our Go services daily.

In this post, we’re going to go over how to implement the low-level open-source amqp package. If instead, you want a higher-level abstraction that provides reasonable defaults, queue boilerplate, and automatic reconnections, check out my new post, Using a High-Level RabbitMQ Client in Golang.

Learn the Go the right way

Go is the language of cloud-native technologies. If you’re interested in modern web systems then our Go Mastery track of courses and projects will give you all the skills you need to have a successful switch.

Brief Overview On Rabbit

The two main entities to be aware of with Rabbit are routing keys and queues. A service publishes a message (JSON in our case) to a routing key. RabbitMQ then copies that message into each queue that’s subscribed to that routing key.

exchanges bidings routing keys

The subscribing service (the consumer) can pull messages off of a queue one at a time. It’s worthwhile to note that a queue can also receive messages from multiple routing keys, but we won’t be diving into that here.

Connecting With Go

First things first, there’s no reason to reinvent the wheel. We’ll use the amqp package provided by streadway to handle the nitty-gritty of the connection details.

In most of my projects, I build a small rabbit package in the internal folder of the project. It exposes only the rabbit functionality that our project cares about.

// Conn - type Conn struct { Channel *amqp.Channel } // GetConn - func GetConn(rabbitURL string) (Conn, error) { conn, err := amqp.Dial(rabbitURL) if err != nil { return Conn{}, err } ch, err := conn.Channel() return Conn{ Channel: ch, }, err }
Code language: Go (go)

The Conn struct just holds a connection to the RabbitMQ server. We’ll also expose a method to get a new connection using just a connection URI. For example, amqp://username:[email protected].


Publishing is easy and is thread-safe out-of-the-box. We just need to expose one more function that publishes using the connection. The calling code provides a routing key and a payload.

// Publish - func (conn Conn) Publish(routingKey string, data []byte) error { return conn.Channel.Publish( // exchange - yours may be different "events", routingKey, // mandatory - we don't care if there I no queue false, // immediate - we don't care if there is no consumer on the queue false, amqp.Publishing{ ContentType: "application/json", Body: data, DeliveryMode: amqp.Persistent, }) }
Code language: Go (go)

The purpose of this small internal package is to set some defaults for the more powerful AMQP package and control which functionality is exposed to our app. For example, if we know that our app will always use the events exchange and we want the mandatory or immediate flags set we can just do so here.


Consuming is a bit trickier than publishing. We’ll use a simple pattern here where we have the app supply a handler function, a queue, the routing key that the queue binds to, and the maximum number of goroutines we should spin up to handle messaegs.

// StartConsumer - func (conn Conn) StartConsumer( queueName, routingKey string, handler func(d amqp.Delivery) bool, concurrency int) error { // create the queue if it doesn't already exist _, err := conn.Channel.QueueDeclare(queueName, true, false, false, false, nil) if err != nil { return err } // bind the queue to the routing key err = conn.Channel.QueueBind(queueName, routingKey, "events", false, nil) if err != nil { return err } // prefetch 4x as many messages as we can handle at once prefetchCount := concurrency * 4 err = conn.Channel.Qos(prefetchCount, 0, false) if err != nil { return err } msgs, err := conn.Channel.Consume( queueName, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return err } // create a goroutine for the number of concurrent threads requested for i := 0; i < concurrency; i++ { fmt.Printf("Processing messages on thread %v...\n", i) go func() { for msg := range msgs { // if tha handler returns true then ACK, else NACK // the message back into the rabbit queue for // another round of processing if handler(msg) { msg.Ack(false) } else { msg.Nack(false, true) } } fmt.Println("Rabbit consumer closed - critical Error") os.Exit(1) }() } return nil }
Code language: Go (go)

If you’re concerned with speed then don’t be afraid to run with a concurrency of at least 100, depending on how computationally and memory intensive your handler is. Assuming your handler is written in a thread-safe way, this is a good way to ensure that your app uses all of its available CPU without being bottlenecked by I/O.

If your app’s handler is very fast (perhaps no network or disk involved) you may need to change the prefetch multiplier from 4 to something higher. The prefetch count tells the Rabbit connection how many messages to retrieve from the server per request. The higher the number, the less time waiting on network calls for each message.

Most of the programs I work on are ephemeral – I don’t mind if they just restart from time to time when bad things happen. For this reason, if the rabbit consumer fails for any reason I just call the os.Exit(1) command. My logs pick it up and we just restart the program. If this doesn’t work for your use case you may want to handle that more elegantly.

Testing The Package

func main() { conn, err := rabbit.GetConn("amqp://guest:[email protected]") if err != nil { panic(err) } go func() { for { time.Sleep(time.Second) conn.Publish("test-key", []byte(`{"message":"test"}`)) } }() err = conn.StartConsumer("test-queue", "test-key", handler, 2) if err != nil { panic(err) } forever := make(chan bool) <-forever } func handler(d amqp.Delivery) bool { if d.Body == nil { fmt.Println("Error, no message body!") return false } fmt.Println(string(d.Body)) return true }
Code language: Go (go)

Related Reading

Trying to find your next programming job?

If you are a self-taught developer having trouble finding your first programming job, we've got your back! We have the learning resources and tight-knit dev community that you need to land the coding job you've been looking for. To get started, create a free account and join our Discord community.

Have questions or feedback?

If we've made a mistake in the article, please let us know so we can get it corrected!

Leave a Comment