There are various places in Tuleap were message queues are needed but the primary one is to run jobs in background
Historically it was for all system related stuff (creating users, git/svn repositories, etc) that required special unix permissions that we were not eager to grant to a web app.
Back in time, we built a simple queue in Database with a cron that consumes events every minute. I can see your eyes rolling but 10 years ago it was not that bad. As it was for system related stuff, we called that SystemEvents
. It was fun and we improved that part giving it more queues (one dedicated to git
for instance).
RabbitMQ as a dedicated queue system
More recently we add a need to share events across servers, for distributed Tuleap setup for instance. Our good old system was no longer able to deal with that because we needed a real queue management that works across servers. We choose RabbitMQ because we were looking for a queue system, PHP support quite decent and tutorials were good.
However, we had to setup a quite complicated queue system. We had the need to guaranty that each and every event is consumed only once per worker. To make a long story short, we built an Exchange to Exchange Binding
Exchange to Exchange binding topology (from http://skillachie.com)
We made the following PHP implementation (simplified here).
In PersistentQueue
we manage connexion to RabbitMQ and deal with events (push and listening to them).
class PersistentQueue
{
private $queue_prefix;
public function __construct($queue_prefix)
{
$this->queue_prefix = $queue_prefix;
}
public function pushSinglePersistentMessage($topic, $content)
{
$this->connect();
$queue = new ExchangeToExchangeBindings($this->channel, $this->queue_prefix);
$queue->publish($topic, $content);
$this->close();
}
public function listen($queue_id, $topic, $callback)
{
$this->connect();
$queue = new ExchangeToExchangeBindings($this->channel, $this->queue_prefix);
$queue->addListener($queue_id, $topic, $callback);
$this->wait();
}
private function connect()
{
$this->connection = new AMQPStreamConnection(...);
$this->channel = $this->connection->channel();
return $this->channel;
}
public function wait()
{
try {
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
} catch (Exception $exception) {
// logging
}
$this->close();
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
In ExchangeToExchangeBindings
we created the right exchanges and queues and wire everything.
class ExchangeToExchangeBindings
{
/**
* @var AMQPChannel
*/
private $channel;
private $publish_exchange_name;
private $internal_exchange_name;
private $queue_prefix;
public function __construct(AMQPChannel $channel, $queue_prefix)
{
$this->channel = $channel;
$this->queue_prefix = $queue_prefix;
$this->publish_exchange_name = $queue_prefix.'_gateway';
$this->internal_exchange_name = $queue_prefix.'_distributor';
}
public function publish($topic, $content)
{
$this->wireExchangesForQueuePersistence();
$message = new AMQPMessage($content, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$this->channel->basic_publish($message, $this->publish_exchange_name, $topic);
}
public function addListener($queue_id, $topic, $callback)
{
$this->wireExchangesForQueuePersistence();
$this->addConsumer($queue_id, $topic, $callback);
}
private function addConsumer($queue_id, $topic, $callback)
{
$queue_name = $this->getConsumerQueue($queue_id);
$this->attachQueueToInternalExchange($queue_name, $topic);
$this->channel->basic_consume($queue_name, '', false, false, false, false, $callback);
}
private function getConsumerQueue($queue_id)
{
return $this->queue_prefix.'_'.$queue_id;
}
private function attachQueueToInternalExchange($queue_name, $topic)
{
$queue_is_persistent = true;
$this->channel->queue_declare($queue_name, false, $queue_is_persistent, false, false);
$this->channel->queue_bind($queue_name, $this->internal_exchange_name, $topic);
}
private function wireExchangesForQueuePersistence()
{
$this->channel->exchange_declare($this->publish_exchange_name, 'fanout', false, true, false);
$this->channel->exchange_declare($this->internal_exchange_name, 'topic', false, true, false);
$this->channel->exchange_bind($this->internal_exchange_name, $this->publish_exchange_name);
}
}
It worked really great for 10 full months and when we built our new Tracker Notifications Worker, we built it on top of that first.
In term of lines of code that’s not too much but the concepts are rather complex (you need to know what an Exchange
, a routing_key
, a topic
and binding
is). To be fair, each and everytime we had to touch this code we had to get back to the initial schema.
Enters Redis
In the meantime we learned about redis RPOPLPUSH and it changed everything.
Ok, it didn’t changed everything first because we didn’t scroll down on the page and we were like “humm how do you pronounce that?”. The real thing is a little bit down on the page Pattern: Reliable queue.
The application in Tuleap is:
- one or several workers are waiting (BRPOPLPUSH) for events in
event_queue
- an agent sends an event to
event_queue
with some payload - one of the agent gets the event and pushes it automatically into a
processing_queue
(one queue per worker) - when the agent has completed its work, the event is removed from
processing_queue
.- if the agent crashes while doing the work, the
processing_queue
keeps the event - when agent re-starts, all events are re-queued from
processing_queue
back toevent_queue
- if the agent crashes while doing the work, the
And everything can be wrapped in one single class:
class RedisPersistentQueue
{
/**
* @var Redis
*/
private $redis;
private $event_queue_name;
public function __construct($event_queue_name)
{
$this->event_queue_name = $event_queue_name;
}
public function listen($worker_id, $topic, $callback)
{
$reconnect = false;
$processing_queue = $this->event_queue_name.'-processing-'.$worker_id;
do {
try {
$this->connect();
$this->queuePastEvents($processing_queue);
$this->waitForEvents($processing_queue, $callback);
} catch (RedisException $e) {
// we get that due to default_socket_timeout
if (strtolower($e->getMessage()) === 'read error on connection') {
$reconnect = true;
} else {
throw $e;
}
}
} while ($reconnect === true);
}
/**
* In case of crash of the worker, the processing queue might contain events not processed yet.
*
* This ensure events are re-queued on main event queue before going further
*/
private function queuePastEvents($processing_queue)
{
do {
$value = $this->redis->rpoplpush($processing_queue, $this->event_queue_name);
} while ($value !== false);
}
private function waitForEvents($processing_queue, callable $callback)
{
while (true) {
$value = $this->redis->brpoplpush($this->event_queue_name, $processing_queue, 0);
$callback($value);
$this->redis->lRem($processing_queue, $value, 1);
}
}
private function connect()
{
$this->redis = new Redis();
$this->redis->connect(...);
}
public function pushSinglePersistentMessage($topic, $content)
{
$this->connect();
$this->redis->lPush(
$this->event_queue_name,
json_encode(
[
'event_name' => $topic,
'payload' => $content,
]
)
);
}
}
As you can see the implementation is rather straightforward. The main pitfall we got is that, by default, php sockets timeout is 60s and that raises as RedisException
. We choose to let default timeout and to catch the exception to reconnect.
Conclusion
While RabbitMQ did the job and did it well, we decided that we will convert all our RabbitMQ queues to Redis. We switched completeness for simplicity and versatility (obviously for caching & K/V purpose).
The amount of events that can be managed is not a limiting factor (we are far from limits, whatever system is used).
In addition to that, it’s far easier to install and to administrate a Redis server than a RabbitMQ one. As most of the Tuleap installations are not under our control that’s also a big plus.