Experimenting with Celery and RabbitMQ

By | 29/10/2014

Celery is an opensource asynchronous distributed task queue that allows processing of vast amounts of messages. That’s a mouth full of course. Let me explain it a bit easier by giving a concrete example: the idea is that for instance activation emails for new sign ups on your website are handled via tasks that are distributed to and executed concurrently on one or more servers. The way it works is that a task is sent over a message queue like RabbitMQ. This is also often referred to as a “Message Broker”. The servers that will execute the tasks, often referred to as “ Workers” are listening to incoming tasks from the broker and will execute them. Obviously, the benefit is that your main web application is offloaded and can continue normal operation, assuming the tasks will be processed at a later time. I found the following a nice tutorial: http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html

Below is a picture to make clear how this works in principle:

Celery Architecture 2

On the client side, a function is called to add your task onto the message queue. The worker computers are listing to the queue and if an incoming task is available, the celery daemon will execute that particular function.

Here is an overview of my setup. We will have Celery installed on the client side. On server side, we install RabbitMQ and Celery.

Celery Architecture 1

Installing Celery is really simple. I followed these steps:

ubuntu@ubuntu-celery-client:~/Celery$ sudo apt-get update
ubuntu@ubuntu-celery-client:~/Celery$ sudo apt-get install python-pip
ubuntu@ubuntu-celery-client:~/Celery$ sudo pip install Celery

The above example is run on the client side so to speak. Obviously, you also have to repeat the above for all the distributed servers that are going to execute the tasks.

Installing RabbitMQ is also not very difficult. Do the following:

ubuntu@ubuntu-celery-server:~/Celery$ sudo apt-get update 
ubuntu@ubuntu-celery-server:~/Celery$ sudo apt-get install rabbitmq-server

Now, we need to configure RabbitMQ. For simplicity, I will have a user “ubuntu” with password “ubuntu”

ubuntu@ubuntu-celery-server:~/Celery$ sudo rabbitmqctl add_user ubuntu ubuntu
ubuntu@ubuntu-celery-server:~/Celery$ sudo rabbitmqctl add_vhost vhost_ubuntu
ubuntu@ubuntu-celery-server:~/Celery$ sudo rabbitmqctl set_permissions -p vhost_ubuntu ubuntu".*"".*"".*"

Now that both Celery and RabbitMQ are installed and configured properly, let’s create an easy example of how this all works.

On the client side, we write the “client.py” script

from celery.execute import send_task

results = []

for x in range(1,100):
        results.append(send_task("tasks.multiply",[200,200]))

Note, the following snippet would also work and I even consider it a bit more cleaner:

from celery import Celery

results = []
celery = Celery()
celery.config_from_object('celeryconfig')

for x in range(1,100):
	results.append(celery.send_task("tasks.multiply",[200,200]))

In the above snippet, we have to do 100 multiplications. Instead of doing them in the same process, we will send these tasks to a different server that will take care of the execution. In order to execute the task, we use Celery’s send_task command. This

On the server side, we write the “tasks.py” script:

from celery.task import task

@task
def multiply(x, y):
        multiplication = x * y
        return "The product is " + str(multiplication)

Before we can execute the scripts, we need to tell Celery where the broker can be found. We do this by creating a celeryconfig.py file that contains the following content:

BROKER_HOST = “173.39.241.238” #IP address of the server B, which is running RabbitMQ and Celery
BROKER_PORT = 5672
BROKER_USER = “ubuntu” #username for RabbitMQ
BROKER_PASSWORD = “ubuntu” #password for RabbitMQ
BROKER_VHOST = “vhost_ubuntu” #host as configured on RabbitMQ server
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS=("tasks”,)

This file is stored on both servers in the same directory as your other scripts.

On the server side, we ensure that Celery is running:

ubuntu@ubuntu-celery-server:~/Celery$ celery worker -l info

 -------------- celery@ubuntu-celery-7696e291-37d9-4d0a-802e-fcc046d9e72d v3.1.16 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.13.0-36-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         default:0x7f6764948750 (.default.Loader)
- ** ---------- .> transport:   amqp://ubuntu:**@173.39.241.238:5672/vhost_ubuntu
- ** ---------- .> results:     amqp
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                
[tasks]
  . tasks.multiply

[2014-10-29 13:22:08,380: INFO/MainProcess] Connected to amqp://ubuntu:**@173.39.241.238:5672/vhost_ubuntu
[2014-10-29 13:22:08,389: INFO/MainProcess] mingle: searching for neighbors
[2014-10-29 13:22:09,399: INFO/MainProcess] mingle: all alone
[2014-10-29 13:22:09,410: WARNING/MainProcess] celery@ubuntu-celery-7696e291-37d9-4d0a-802e-fcc046d9e72d ready.

The server is now configured and waiting for tasks to execute. On the client server, we execute the client.py file:

ubuntu@ubuntu-celery-client:~/Celery$ python client.py 

If all is well, you should see output the is similar to the one below:

[2014-10-29 13:23:53,169: INFO/MainProcess] Received task: tasks.multiply[ec6273e2-2adf-4a98-b3ab-7d2b95bb72df]
[2014-10-29 13:23:53,176: INFO/MainProcess] Received task: tasks.multiply[c94d8e5a-4afc-4920-916f-b33fca0dc94c]
[2014-10-29 13:23:53,186: INFO/MainProcess] Received task: tasks.multiply[8cdcb1de-31f5-455c-b785-19d8eb9281f2]
[2014-10-29 13:23:53,187: INFO/MainProcess] Received task: tasks.multiply[5ecb8a03-2af4-4d6f-ab2f-e8b0f4398f54]
[2014-10-29 13:23:53,188: INFO/MainProcess] Received task: tasks.multiply[1d8c3efb-ad20-42e9-976b-34b8be0a5e39]
[2014-10-29 13:23:53,205: INFO/MainProcess] Task tasks.multiply[ec6273e2-2adf-4a98-b3ab-7d2b95bb72df] succeeded in 0.0337770140031s: 'The product is 40000'
[2014-10-29 13:23:53,208: INFO/MainProcess] Received task: tasks.multiply[5c42dac8-2f4f-4639-9089-d91b2873dff1]
[2014-10-29 13:23:53,219: INFO/MainProcess] Task tasks.multiply[c94d8e5a-4afc-4920-916f-b33fca0dc94c] succeeded in 0.0136614609946s: 'The product is 40000'
[2014-10-29 13:23:53,221: INFO/MainProcess] Received task: tasks.multiply[a72e756b-1e99-4455-ad18-4110cbfd3e1e]
[2014-10-29 13:23:53,224: INFO/MainProcess] Task tasks.multiply[8cdcb1de-31f5-455c-b785-19d8eb9281f2] succeeded in 0.00538706198859s: 'The product is 40000'
[2014-10-29 13:23:53,226: INFO/MainProcess] Received task: tasks.multiply[004296e3-f931-4075-ba83-09a7804b5e49]
[2014-10-29 13:23:53,229: INFO/MainProcess] Task tasks.multiply[5ecb8a03-2af4-4d6f-ab2f-e8b0f4398f54] succeeded in 0.00483364899992s: 'The product is 40000'
[2014-10-29 13:23:53,231: INFO/MainProcess] Received task: tasks.multiply[18172600-70f8-4402-a923-db02d71718a5]
[2014-10-29 13:23:53,235: INFO/MainProcess] Task tasks.multiply[1d8c3efb-ad20-42e9-976b-34b8be0a5e39] succeeded in 0.00486789099523s: 'The product is 40000'
.....

You can see that first 5 tasks were retrieved from the queue and then the workers started to execute the tasks successfully, each displaying the multiplication result.