Tuesday, November 16, 2010

Setup Django RabbitMQ and Celery

Rabbit Message Queue is a separate server that remotely executes tasks given to it. 
Celery is a Python client program that sends tasks to the RabbitMQ.  
Django-celery is a Django wrapper for Celery that makes it 
work with Django more nicely.  Tasks are executed in Django 
view code or wherever.  The tasks can be defined in the Django app, 
are sent to a Celery client daemon executed by ./manage.py and
Celery serializes the task and sends it to RabbitMQ for processing.
RabbitMQ notifies Celery when it is done with each task. 

1) Install deps
sudo aptitude install python-virtualenv  #(recommended but not required)
sudo aptitude install rabbitmq-server

2) Create rabbitmq user and vhost.  These settings must be set again if you change your server hostname.  Or you can set the NODENAME to rabbit@localhost in the rabbitmq configuration. 
sudo rabbitmqctl add_user my_rabbit_user mypasswd
sudo rabbitmqctl add_vhost my_vhost
sudo rabbitmqctl set_permissions -p my_vhost my_rabbit_user ".*" ".*" ".*"

3) Setup python environment
virtualenv --no-site-packages testrabbitmq  #(create a virtualenv, not required)
cd testrabbitmq
. ./bin/activate # (activate virtualenv, not required)
pip install django
pip install celery
pip install django-celery

4) Setup Django project
django-admin.py startproject testc  # create a test project
cd testc
python ./manage.py startapp cel  # create a test app

5) Create a test model
Edit cel/models.py:
from django.db import models
class MyModel(models.Model):
    field1 = models.CharField(max_length=12)
    field2 = models.CharField(max_length=12)

6) Create some test tasks:
Edit cel/tasks.py
from celery.decorators import task
def add(x, y)
    return x + y

from cel import models
def addmodel(x, y):
    record = models.MyModel.objects.create(field1=x, field2=y)
    return record

from cel import models
@task(ignore_result=True)  # Celery will ignore results sent back to it
def addmodel2(x, y):
    record = models.MyModel.objects.create(field1=x, field2=y)
    return record

7) configure django settings
Edit settings.py

# set django-celery autoloader 
import djcelery
# Set database settings

# set information to connect to rabbitmq (broker) 
BROKER_VHOST = "/my_vhost"
BROKER_USER = "my_rabbit_user"

# add to installed apps
    'djcelery',  # django-celery

8) Syncdb
python ./manage syncdb

9) Restart the rabbitmq server (optional)
UBUNTU 10.04 NOTE - Ubuntu starts the rabbitmq 
server by default and installs an init script 
(/etc/init.d/rabbitmq-server start|stop).  For testing, 
let's stop the server and restart manually run it in the 
foreground to see more output. 
sudo /etc/init.d/rabbitmq-server stop
sudo rabbitmq-server start
# will display lots of output and say broker is running.  
# The terminal will wait.

10) Start the celery client process
In another terminal (if using virtualenv, be sure to activate it)
in the testrabbitmq Django project, execute the following:

python ./manage.py celeryd -l info
# this will hang the terminal and set the Celery (the message client) waiting. 

11) Send a message
In another terminal (if using virtualenv, be sure to activate it), 
in the testrabbitmq Django project, execute the following:
python ./manage.py shell
>>> from cel import tasks
>>> result = tasks.add.delay(1, 2)
>>> result.ready()  # waits until task is done
>>> result.state
>>> result.successful()
>>> result = tasks.add.delay(1 + 2) # will cause an error
>>> result.successful()
>>> result = tasks.addmodel.delay('a','b')
>>> result.successful()
>>> for i in range(0,1000):  # stresstest
....            result = tasks.addmodel.delay('a','b')

# alternate syntax (more flexible - can pass args)
>>> result = tasks.addmodel.apply_asyc(args=['c','d'])
>>> result.successful()

# execute at a given time
>>> from datetime import datetime, timedelta
>>> result = tasks.addmodel.apply_asyc(args=['c','d'],
                             eta=datetime.now() + timedelta(minutes=1))
>>> result.successful()

# execute after a given number of seconds
>>> result = tasks.addmodel.apply_asyc(args=['c','d'],
>>> result.successful()

# alternate syntax  (can queue functions you don't own)
>>> from celery.execute import send_task
>>> tasks.addmodel.name
>>> result = send_task("cel.tasks.addmodel", args=['e', 'f'])
>>> result.get()

>>> from djcelery.models import TaskMeta
>>> TaskMeta.objects.count()  # this table contains all results meta info 
                                                  # for tasks not defined with ignore_result=True

APPENDIX - rabbitmq commands

sudo rabbitmqctl stop  # stop the rabbitmq
sudo rabbitmqctl list_users # list the available users
sudo rabbitmqctl list_vhosts # list the available vhosts
sudo rabbitmqctl list_queues
Listing queues ...
celery 544   # by default, lists the queues in the server 
                   # and how many messages in them
sudo rabbitmqctl list_queues [options]
    common list_queues options 
         name = name of queue
         durable = queue survives server restarts
         pid = erlang process id  
         messages_ready = ready to be delivered to clients
         messages = queue depth
         memory = bytes of mem consumed by erlang process

# NOTES, for any changes to the code, be sure to restart the celery client (./manage.py celeryd)

UPDATE: I did a talk about this at Chicago Djangonaughts.  See here: 



Unknown said...

Just want to add that this line:

sudo rabbitmqctl set_permissions -p /my_vhost my_rabbit_user "" ".*" ".*"

Should be

sudo rabbitmqctl set_permissions -p /my_vhost my_rabbit_user ".*" ".*" ".*"

on RabbitMQ 2.x

That is "" must be replaced with ".*".

Joe Jasinski said...

Thanks for your comment ask. I copied the syntax from the Opus project documentation, though don't really know what it means... I guess something about restricting what resources to view.


What does adding the ".*" do?

Ronna said...
This comment has been removed by the author.
Ronna said...

to stop rabbitmq use "sudo rabbitmqctl stop" not rabbitmq-server.

SixDegrees said...

Running celery through manage.py is not suitable for a production environment. What is needed to make all of this work through Apache?

What needs to be done to make use of multiple machines, so processing of multiple long-running tasks can be distributed?

Barun Saha said...

Nice document! Really helpful. But what does that Python virtual environment do?

Kurt Neufeld said...

You also need to add:

to settings.py

Kurt Neufeld said...
This comment has been removed by the author.
Rahul said...

Can you tell me what's the location of settings.py or is celeryconfig.py?