Last year we introduced RabbitMQ into our stack at Waarneembemiddeling.nl. We were in desperate need of a worker queue and after fiddling around with Gearman, Beanstalkd and RabbitMQ we made our choice: RabbitMQ it will be.
Now there’s quite some information to be found on RabbitMQ and how to use it, but a lot of things you have to find out yourself. Questions like:
- what happens to messages on the queue after a service restart?
- what happens to messages on the queue after a reboot?
- how do we notice that a worker crashed?
- what happens to my message when the consumer dies while processing?
- etc.
Using RabbitMQ and Symfony2 (or php in general) is quite easy. There is a bundle for Symfony2 called OldSoundRabbitMqBundle and a php library called php-amqplib which work very well. Both are from the same author, you should probably thank him for that 🙂 .
First try: pure php consumers
We’re running a fairly common setup. Because we’ve been warned that php consumer die out every now and then, we’re using Supervisor to start new consumers when needed. There is a lot of information out there on this subject so I won’t go in there.
Despite the warnings we started with pure php consumers powered by the commands in OldSoundRabbitMqBundle. The first workers were started like this:
1 |
$ app/console rabbitmq:consumer async_event -e=prod |
This means we’re consuming from the async_event queue without any limit to the messages. Basically this means it will run forever, or better said: until php crashes. Or worse: your consumer ends up in non-response state. Which means it doesn’t process any message any more and Supervisor thinks all is fine because you still have a running process. This happened once to our mail queue. I can assure you it’s better to prevent these kind of things.
Second try: pure php consumers with limited messages
So after the mail-gate I was searching for a quick way to make our setup more error proof. The OldSoundRabbitMqBundle supports limiting the messages to process. So I limited our workers so that they got restarted a couple of times a day:
1 |
$ app/console rabbitmq:consumer async_event -e=prod -m 10 |
After that things got running more smoothly and it took a while before we encountered new problems. While spitting trough the logs I notices some consumers produced some errors. A brief summary:
- General error: 2006 MySQL server has gone away
- Warning: Error while sending QUERY packet.
Because the consumer is one process that keeps running, that also means that the service container and stuff keeps existing in memory. When you’ve done some queries the database connection keeps open in the background. And if it’s quiet on our queue, it may take some time before we reach the message limit. If that time exceeds the connect_timeout
of your MySQL server, you’ll run into the warnings and errors about lost connections.
Of course we should close the connection after the message is processed or could try catch for Doctrine DBAL connection exceptions or increase the connect_timeout
setting but thats just denying the real problem. Running consumers with a booted Symfony2 kernel just doesn’t work so well.
A final resort could be to strip down the consumers and don’t use the Symfony2 kernel and container but we don’t like that. Our messages are most of the time serialized events which get dispatched again after the consumer picks them up. At the application level we don’t want to know wether we are in a RabbitMQ consumer or in a normal HTTP request.
Real solution: rabbitmq-cli-consumer
So it took a couple of months to learn the hard way we needed some different solution for our consumers. I found this interesting blog post about the same problem. He solved it with Java and Ruby consumers. We all learned java in college right, but I don’t like to run the memory eating jvm on our servers. The Ruby consumer unfortunately misses some good documenten for me as Ruby virgin. So I got a bit lost there.
That was the point where Go got in. Go is a kind of improved C with not real OO but a lot of cool stuff in it. I wrote a application that makes it possible to consume messages from RabbitMQ queue and pipe them into an command line application. I called it: rabbitmq-cli-consumer.
The main advantages for using rabbitmq-cli-consumer are:
- no more stability issues to deal with
- lightweight and fast
- no need to restart your workers after a fresh deployment
We still use supervisor to start and stop the consumers because it’s the right tool for it. An example of how we start a consumer:
1 2 3 4 5 6 7 8 9 |
$ rabbitmq-cli-consumer -c example.conf -e "/home/vagrant/current/app/console wb:consumer:mail --env=prod" -V 2015/01/09 20:57:52 Setting QoS... 2015/01/09 20:57:52 Succeeded setting QoS. 2015/01/09 20:57:52 Registering consumer... 2015/01/09 20:57:52 Succeeded registering consumer. 2015/01/09 20:57:52 Waiting for messages... 2015/01/09 20:58:07 Processing message... 2015/01/09 20:58:13 Processed! ... |
An example of a Symfony2 command we use:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
namespace Wb\Bundle\CoreBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class MailConsumerCommand extends ContainerAwareCommand { protected function configure() { $this ->setName('wb:consumer:mail') ->addArgument('message', InputArgument::REQUIRED) ; } protected function execute(InputInterface $input, OutputInterface $output) { $message = $input->getArgument('message'); if (! $message = unserialize(base64_decode($message))) { throw new \InvalidArgumentException('Invalid input received'); } $container = $this->getContainer(); $mailer = $container->get('swiftmailer.mailer.mandrill'); $transport = $container->get('swiftmailer.mailer.mandrill.transport.real'); $logger = $container->get('rabbitmq_mail_logger'); $logger->info(sprintf( '[%s] [%s] Received message from queue', $message->getSubject(), implode(', ', array_keys($message->getTo())) )); $spool = $mailer->getTransport()->getSpool(); $mailer->send($message); $spool->flushQueue($transport); $logger->info(sprintf( '[%s] [%s] Mail send', $message->getSubject(), implode(', ', array_keys($message->getTo())) )); } } |
Final tip: use the management plugin
Before even starting with RabbitMQ make sure you have the management plugin installed. It gives you a good overview about whats happening. Also you can purge queues, add users, add vhosts etc.