ex_rabbit_pool open source AMQP connection pool
- Erlang Solutions Team
- 2nd Apr 2019
- 26 min of reading time
A couple of months ago we started writing an open-source continuous delivery system which we called Buildex.
Buildex is implemented as a collection of microservices which:
Buildex was born of the frustrations and limitations we faced when attempting to debug production failures in other build systems. Buildex was also influenced by security, privacy, and pricing concerns we encountered while using some of the popular commercial SaaS offerings.
The principal components of Buildex are: The Poller, which uses the excellent Tentacat Github API to poll GitHub for new tags, the Builder, which checks out and builds the project inside Docker containers, and the Datastore which uses ueberauth OAuth to delegate authorization to Github, and allows users to configure projects for the continuous integration pipeline.
The following diagram provides a high-level overview of the Buildex micro-services and supporting infrastructure.
Although we could have used distributed Erlang for everything, we wanted to have the ability for operations and develops to easily monitor inter-service communication and define message replication as they pleased, for example with logging or integration with 3rd party services (email, auditing, manually triggering builds, etc).
We decided to use RabbitMQ for inter-service communication, which in turn led to our need for a higher level connection handling and channel management library.
As part of the work to support these services, we created a RabbitMQ connection management and pooling library called ex_rabbit_pool. ex_rabbit_pool is built upon the Elixir library AMQP which is an Elixir wrapper for the RabbitMQ client rabbitmq-erlang-client, and the Erlang resource pooling library poolboy. In this post, we will take you through the lessons learnt and steps taken to implement ex_rabbit_pool, but first, a quick explanation of channels and connections.
Connections, as in TCP connections, to RabbitMQ, are expensive to create and a finite system resource, Channels, are a lightweight abstraction over connections.
Quoting the RabbitMQ documentation, “AMQP 0-9-1 connections are multiplexed with channels that can be thought of as ‘lightweight connections that share a single TCP connection’”.
In the Erlang world, you will typically assign a channel per worker process, on other platforms you would assign a channel per-thread.
Open a connection
{:ok, connection} = AMQP.Connection.open(host: "localhost", port: 5672)
Open a channel with that connection
{:ok, channel} = AMQP.Channel.open(connection)
Bind the channel to a queue via an exchange
AMQP.Queue.bind(channel, "test_queue", "test_exchange")
(Listening) Subscribe to the queue
AMQP.Queue.subscribe (channel, "test_queue", fn(payload, meta) -> IO.puts("Received: #{payload}") end)
(Sending) Publish message to queue
AMQP.Basic.publish(channel, "test_exchange", "", "Hello, World!"
A supervision tree diagram is a good way to get an overview of a BEAM system, so let’s start by examining the supervision tree of ex_rabbit_pool.
The principal components are the PoolSupervisor and RabbitConnection. We will examine the implementation of both components over the course of the following sections.
First, we define a top-level supervisor, PoolSupervisor, which will be responsible for managing the connection pool. PoolSupervisor is intended to be started within an application so we leave the start-up management to the application developer (here’s how we manage the pool supervisor in Builder).
PoolSupervisor provides an exported function, start_link/1
which takes as arguments both the RabbitMQ connection parameters and connection pool configuration.
defmodule ExRabbitPool.PoolSupervisor do
use Supervisor
alias ExRabbitPool.Worker.SetupQueue
@type config :: [rabbitmq_config: keyword(), rabbitmq_conn_pool: keyword()]
@spec start_link(config()) :: Supervisor.on_start()
def start_link(config) do
Supervisor.start_link(__MODULE__, config)
end
@impl true
def init(config) do
children = []
opts = [strategy: :one_for_one]
Supervisor.init(children, opts)
end
end
if you are not familiar with poolboy you can read a good introduction over at Elixir School, continuing on, here is the pool configuration that we will use:
[
rabbitmq_conn_pool: [
name: {:local, :connection_pool},
worker_module: ExRabbitPool.Worker.RabbitConnection,
size: 2,
max_overflow: 0
]
]
Note the attribute worker_module
which is a GenServer module, instances of which will be managed as the pooled resource, in this case, RabbitConnection
is the GenServer in charge of connecting to RabbitMQ.
We now extract the RabbitMQ and pool configuration from the start_link/1 parameters. AMQP provides helpful defaults for managing its connections so we only need to pass the RabbitMQ configuration to AMQP.
We do so, and configure poolboy to manage our connection pool:
rabbitmq_conn_pool = Keyword.get(config, :rabbitmq_conn_pool)
rabbitmq_config = Keyword.get(config, :rabbitmq_config, [])
{_, pool_id} = Keyword.fetch!(rabbitmq_conn_pool, :name)
:poolboy.child_spec(pool_id, rabbitmq_conn_pool, rabbitmq_config)
Let’s take a look at the full implementation:
defmodule ExRabbitPool.PoolSupervisor do
use Supervisor
alias ExRabbitPool.Worker.SetupQueue
@type config :: [rabbitmq_config: keyword(), rabbitmq_conn_pool: keyword()]
@spec start_link(config()) :: Supervisor.on_start()
def start_link(config) do
Supervisor.start_link(__MODULE__, config)
end
@impl true
def init(config) do
rabbitmq_conn_pool = Keyword.get(config, :rabbitmq_conn_pool)
rabbitmq_config = Keyword.get(config, :rabbitmq_config, [])
{_, pool_id} = Keyword.fetch!(rabbitmq_conn_pool, :name)
children = [
:poolboy.child_spec(pool_id, rabbitmq_conn_pool, rabbitmq_config)
]
opts = [strategy: :one_for_one]
Supervisor.init(children, opts)
end
end
We continue by defining the worker_module responsible for handling the connection to RabbitMQ. The worker_module will hold the connection, a list of multiplexed channels to RabbitMQ, a corresponding list of monitors (we will explain their purpose later in this post), an adapter (so we can plug a stub implementation later on for testing purposes) and the configuration so we can configure some parts of our application dynamically.
With these attributes in place, we create a State module with an associate state struct to represent the internal state of our RabbitConnection
GenServer.
defmodule ExRabbitPool.Worker.RabbitConnection do
use GenServer
defmodule State do
@type config :: keyword()
@enforce_keys [:config]
@type t :: %__MODULE__{
adapter: module(),
connection: AMQP.Connection.t(),
channels: list(AMQP.Channel.t()),
monitors: [],
config: config()
}
defstruct
adapter: ExRabbitPool.RabbitMQ,
connection: nil,
channels: [],
config: nil,
monitors: []
end
def start_link(config) do
GenServer.start_link(__MODULE__, config, [])
end
In the init/1 callback, we send an asynchronous: connect
message so the connection to RabbitMQ will be initialised separately without blocking the GenServer startup phase (supervisors create the child processes sequentially, and expect them to start very quickly). We also trap exits so all linked connections and multiplexed channels can be restarted by this worker when they crash. Take a look at our default adapter for the full implementation.
def init(config) do
Process.flag(:trap_exit, true)
send(self(), :connect)
# split our options from the RabbitMQ client adapter
{opts, amqp_config} = Keyword.split(config, [:adapter])
adapter = Keyword.get(opts, :adapter, ExRabbitPool.RabbitMQ)
{:ok, %State{adapter: adapter, config: amqp_config}}
end
Now we implement our first GenServer callback, handle_info
to handle the: connect
message. Within the handler, we open the connection to RabbitMQ using the adapter.
In the case where we raise an error, we can schedule another retry or stop the GenServer.
In order to retry, we attempt to reconnect to RabbitMQ asynchronously using Process.send_after(self(), :connect, 1000)
.
If the connection is established successfully we create and start the required RabbitMQ channels, create another pool inside our worker for those created channels and link them to our process.
We need to link the channels to our process so that if a client closes a channel or a channel crashes we can respond by creating another in order to maintain the channel pool at the same size, we then store them in the RabbitConnection
state for later reuse
def handle_info(:connect, %{adapter: adapter, config: config} = state) do
case adapter.open_connection(config) do
{:error, reason} -> schedule_connect(config)
{:noreply, state}
{:ok, %{pid: pid}} ->
true = Process.link(pid)
num_channels = Keyword.get(config, :channels, 1000)
channels =
do_times(num_channels, 0, fn ->
{:ok, channel} = start_channel(adapter, connection)
true = Process.link(channel.pid)
channel
end)
{:noreply, %State{state | connection: connection, channels: channels}}
end
end
@spec do_times(non_neg_integer(), non_neg_integer(), (() -> any())) :: [any()]
defp do_times(limit, counter, _function) when counter >= limit, do: []
defp do_times(limit, counter, function) do
[function.() | do_times(limit, 1 + counter, function)]
end
When creating the channels we need to ensure the connection process is alive or return an error instead then we open a channel using our client adapter and return it’s result
@spec start_channel(module(), AMQP.Connection.t()) :: {:ok, AMQP.Channel.t()} | {:error, any()}
defp start_channel(client, connection) do
if Process.alive?(connection.pid) do
case client.open_channel(connection) do
{:ok, _channel} = result ->
Logger.info("[Rabbit] channel connected")
result
{:error, reason} = error ->
Logger.error("[Rabbit] error starting channel reason: #{inspect(reason)}")
error
error ->
Logger.error("[Rabbit] error starting channel reason: #{inspect(error)}")
{:error, error}
end
else
{:error, :closing}
end
end
Now we have a pool of connections to RabbitMQ and each connection has a pool of channels that clients can check out and check-in again, but at this moment we haven’t yet implemented those features, let’s do that now.
For the checkout_channel handler, we also need to handle some edge cases.
Firstly, the case when we are still unable to connect to RabbitMQ. In such a situation we need to tell the client to retry later with a {:error,:disconnected}
result.
Secondly, the situation where there are no channels in the pool, this can happen when channels are already checked out by other clients – in such a situation we have a couple of options, either we can use the async/reply pattern to block and wait a period of time for the new channel to be created or we can return {:error, :out_of_channels}
which is simpler and pushes the retry handling decision to the user, to retry later or fail immediately.
After we have covered the edge cases when checking out a channel, we can proceed with our implementation of the actual checkout which does the following: it will monitor the client which is claiming the channel, this way if a client crashes we can return the claimed channel back to the pool so another client can reuse it, return {:ok, channel}
and then save the monitor reference with the assigned channel into the RabbitConnection
state for safe keeping.
def handle_call(:checkout_channel, _from, %State{connection: nil} = state) do
{:reply, {:error, :disconnected}, state}
end
def handle_call(:checkout_channel, _from, %{channels: []} = state) do
{:reply, {:error, :out_of_channels}, state}
end
def handle_call(
:checkout_channel,
{from_pid, _ref},
%{channels: [channel | rest], monitors: monitors} = state
) do
monitor_ref = Process.monitor(from_pid)
{:reply, {:ok, channel},
%State{state | channels: rest, monitors: [{monitor_ref, channel} | monitors]}}
end
We now implement the functionality to return a channel back into the pool, doing so requires the following steps:
RabbitConnection
state list of monitored processesYou may notice that we are stopping and creating a new channel every single time we return a channel into the pool, and this is because when a client uses a channel it can change its state, that means, channels are stateful, that’s why we need to create a new channel to replace the old one so we don’t have weird errors if we were reusing channels
@impl true
def handle_cast(
{:checkin_channel, %{pid: pid} = old_channel},
%{connection: conn, adapter: adapter, channels: channels, monitors: monitors} = state
) do
# only start a new channel when checkin back a channel that isn't removed yet
# this can happen when a channel crashed or is closed when a client holds it
# so we get an `:EXIT` message and a `:checkin_channel` message in no given
# order
if find_channel(pid, channels, monitors) do
new_channels = remove_channel(channels, pid)
new_monitors = remove_monitor(monitors, pid)
case replace_channel(old_channel, adapter, conn) do
{:ok, channel} ->
{:noreply, %State{state | channels: [channel | new_channels], monitors: new_monitors}}
{:error, :closing} ->
# RabbitMQ Connection is closed. nothing to do, wait for reconnection
{:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
end
else
{:noreply, state}
end
end
defp find_channel(channel_pid, channels, monitors) do
Enum.find(channels, &(&1.pid == channel_pid)) ||
Enum.find(monitors, fn {_ref, %{pid: pid}} ->
pid == channel_pid
end)
end
defp replace_channel(old_channel, adapter, conn) do
true = Process.unlink(old_channel.pid)
# omit the result
adapter.close_channel(old_channel)
case start_channel(adapter, conn) do
{:ok, channel} = result ->
true = Process.link(channel.pid)
result
{:error, _reason} = error ->
error
end
end
We now have a reasonably complete connection worker, but we still need to implement error handling for crashing connections, channels crashing/closing and exceptions raised within the clients.
As we already have our worker process linked to the RabbitMQ connection process, we will receive a message corresponding to {:EXIT, pid, reason}
if the connection process terminates. We pattern match to ensure the failing process pid is the same as the connection process pid, discard the connection and attempt to schedule reconnection in the background using Process.send_after/3
.
def handle_info({:EXIT, pid, reason}, %{connection: %{pid: pid}, config: config} = state) do
Process.send_after(self(), :connect, 1000)
{:noreply, %State{state | connection: nil}}
end
In the case where the connection crashes and we have channels linked to the connection process, we will receive messages informing us about the crashed channels.
We must handle the connection crash in two ways, firstly where the connection already crashed and is now nil, and secondly where the connection remains active but a channel crashed or was closed.
Let’s implement the first, where the connection already crashed. We pattern match on the nil connection, then we remove the crashed pid from the channels list and remove any monitor associated with that process identifier. Done.
def handle_info(
{:EXIT, pid, reason},
%{connection: nil, channels: channels, monitors: monitors} = state
) do
Logger.error("[Rabbit] connection lost, removing channel reason: #{inspect(reason)}")
new_channels = remove_channel(channels, pid)
new_monitors = remove_monitor(monitors, pid)
{:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
end
defp remove_channel(channels, channel_pid) do
Enum.filter(channels, fn %{pid: pid} ->
channel_pid != pid
end)
end
defp remove_monitor(monitors, channel_pid) when is_pid(channel_pid) do
monitors
|> Enum.find(fn {_ref, %{pid: pid}} ->
channel_pid == pid
end)
|> case do
# if nil means DOWN message already handled and monitor already removed
nil ->
monitors
{ref, _} = returned ->
true = Process.demonitor(ref)
List.delete(monitors, returned)
end
end
Handling the second case when the connection remains open but a channel crashed is the same as handling the case of a crashed connection with the additional requirement that we need to create another channel, link it to our worker, and add the channel to the pool.
@impl true
def handle_info(
{:EXIT, pid, reason},
%{channels: channels, connection: conn, adapter: adapter, monitors: monitors} = state
) do
Logger.warn("[Rabbit] channel lost reason: #{inspect(reason)}")
# don't start a new channel if crashed channel doesn't belongs to the pool
# anymore, this can happen when a channel crashed or is closed when a client holds it
# so we get an `:EXIT` message and a `:checkin_channel` message in no given
# order
if find_channel(pid, channels, monitors) do
new_channels = remove_channel(channels, pid)
new_monitors = remove_monitor(monitors, pid)
case start_channel(adapter, conn) do
{:ok, channel} ->
true = Process.link(channel.pid)
{:noreply, %State{state | channels: [channel | new_channels], monitors: new_monitors}}
{:error, :closing} ->
# RabbitMQ Connection is closed. nothing to do, wait for reconnections
{:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
end
else
{:noreply, state}
end
end
Now we have everything covered for handling connection and channel errors, crashes and closes, but we still need to implement the logic for when a client being monitored crashes without returning the channel back to the pool, in this case, we should remove the client from the monitors list and return the channel to the active channels list.
@impl true
def handle_info(
{:DOWN, down_ref, :process, _, _},
%{channels: channels, monitors: monitors, adapter: adapter, connection: conn} = state
) do
monitors
|> Enum.find(fn {ref, _chan} -> down_ref == ref end)
|> case do
nil ->
{:noreply, state}
{_ref, old_channel} = returned ->
new_monitors = List.delete(monitors, returned)
case replace_channel(old_channel, adapter, conn) do
{:ok, channel} ->
{:noreply, %State{state | channels: [channel | channels], monitors: new_monitors}}
{:error, :closing} ->
# RabbitMQ Connection is closed. nothing to do, wait for reconnection
{:noreply, %State{state | channels: channels, monitors: new_monitors}}
end
end
end
Now that we have covered everything related to connection and channel handling we need to implement an API for our library.
This will require us to add some convenience functions for our GenServer
and create an API layer to perform the work of getting a connection worker out of the connection pool and executing a function in the context of a channel for us.
Firstly, we define functions for checking channels in and out of the pool:
def checkout_channel(pid) do
GenServer.call(pid, :checkout_channel)
end
def checkin_channel(pid, channel) do
GenServer.cast(pid, {:checkin_channel, channel})
end
Having done so, we then proceed to create our library API, where we define functions for retrieving a connection worker from the pool, and to execute a function in the context of a channel. When checking out a connection worker we don’t care about isolating access to each process – we use a pool purely in order to spread load (pool config strategy :fifo).
The supplied function will receive one of:
:ok
and a channel
orWe also define the basic functions for checking in and out a channel manually for a connection worker.
defmodule ExRabbitPool do
alias ExRabbitPool.Worker.RabbitConnection, as: Conn
@type f :: ({:ok, AMQP.Channel.t()} | {:error, :disconected | :out_of_channels} -> any())
@spec get_connection_worker(atom()) :: pid()
def get_connection_worker(pool_id) do
conn_worker = :poolboy.checkout(pool_id)
:ok = :poolboy.checkin(pool_id, conn_worker)
conn_worker
end
@spec with_channel(atom(), f()) :: any()
def with_channel(pool_id, fun) do
pool_id
|> get_connection_worker()
|> do_with_conn(fun)
end
def checkout_channel(conn_worker) do
Conn.checkout_channel(conn_worker)
end
def checkin_channel(conn_worker, channel) do
Conn.checkin_channel(conn_worker, channel)
end
defp do_with_conn(conn_worker, fun) do
case checkout_channel(conn_worker) do
{:ok, channel} = ok_chan ->
try do
fun.(ok_chan)
after
:ok = checkin_channel(conn_worker, channel)
end
{:error, _} = error ->
fun.(error)
end
end
end
With this code implemented, we now want to quickly verify our code in the Elixir interactive console IEX, but before we do so, we’ll need access to a running RabbitMQ instance.
Docker makes this trivial to do.
Let’s work our way through the required steps.
First, we pull the RabbitMQ Docker image from the Docker hub:
docker pull rabbitmq:3.7.7-management
Then we run the RabbitMQ image in another terminal in the foreground, mapping both its web management interface on port 15672 and its message port 5672, to the host loopback interface.
docker run --rm --hostname bugs-bunny --name roger_rabbit -p 5672:5672 -p15672:15672 rabbitmq:3.7.7-management
After waiting for RabbitMQ initialization to complete, we proceed to copy the following code into the Elixir console in order to verify that everything works as expected:
First the configuration:
rabbitmq_config = [channels: 1]
rabbitmq_conn_pool = [
name: {:local, :connection_pool},
worker_module: ExRabbitPool.Worker.RabbitConnection,
size: 1,
max_overflow: 0
]
Then we create an instance of the PoolSupervisor:
{:ok, pid} =
ExRabbitPool.PoolSupervisor.start_link(
rabbitmq_config: rabbitmq_config,
rabbitmq_conn_pool: rabbitmq_conn_pool
)
And finally, we verify everything is working by publishing a message to the AMQP queue “ex_rabbit_pool” via the same channel we published it on.
ExRabbitPool.with_channel(:connection_pool, fn {:ok, channel} ->
queue = "ex_rabbit_pool"
exchange = "my_exchange"
routing_key = "example"
{:ok, _} = AMQP.Queue.declare(channel, queue, auto_delete: true, exclusive: true)
:ok = AMQP.Exchange.declare(channel, exchange, :direct, auto_delete: true, exclusive: true)
:ok = AMQP.Queue.bind(channel, queue, exchange, routing_key: routing_key)
:ok = AMQP.Basic.publish(channel, exchange, routing_key, "Hello World!")
{:ok, msg, _} = AMQP.Basic.get(channel, queue, no_ack: true)
IO.puts(msg)
end)
The message “Hello World!”
is printed to the console. Tada! The library works.
If you want to see an example of a real-life consumer, take a look at the implementation of the Buildex Jobs Consumer.
There are some nice enhancements we would like to make and would be very happy to accept quality contributions. All feedback is welcome, this would be a really nice Elixir project to make your first open source contribution!
Feel free to explore the source code of this blog post here and give it a try!
Special thanks to Bryan Hunt for his help editing this blog and reviewing all the code that was produced while constructing this library, also special thanks to our RabbitMQ experts for reviewing our code and ensuring we were following the best practices when handling connections and channels inside our library.
Discover the right tools and methods for RabbitMQ debugging.
RabbitMQ is the most deployed open source message broker. It provides a highly available solution to be used as a message bus, as a routing layer for microservices of as a mediation layer for legacy systems . Find out about how our world-leading RabbitMQ experts can help you.