Rabbit Message Queue is a separate server that remotely executes tasks given to it.
Celery is a Python client program that sends tasks to the RabbitMQ.
Django-celery is a Django wrapper for Celery that makes it
work with Django more nicely. Tasks are executed in Django
view code or wherever. The tasks can be defined in the Django app,
are sent to a Celery client daemon executed by ./manage.py and
Celery serializes the task and sends it to RabbitMQ for processing.
RabbitMQ notifies Celery when it is done with each task.
1) Install deps
UBUNTU 10.04 NOTE
sudo aptitude install python-virtualenv
#(recommended but not required)
sudo aptitude install rabbitmq-server
2) Create rabbitmq user and vhost. These settings must be set again if you change your server hostname. Or you can set the NODENAME to rabbit@localhost in the rabbitmq configuration.
sudo rabbitmqctl add_user my_rabbit_user mypasswd
sudo rabbitmqctl add_vhost my_vhost
sudo rabbitmqctl set_permissions -p my_vhost my_rabbit_user ".*" ".*" ".*"
3) Setup python environment
virtualenv --no-site-packages testrabbitmq
#(create a virtualenv, not required)
cd testrabbitmq
. ./bin/activate
# (activate virtualenv, not required)
pip install django
pip install celery
pip install django-celery
4) Setup Django project
django-admin.py startproject testc
# create a test project
cd testc
python ./manage.py startapp cel
# create a test app
5) Create a test model
Edit cel/models.py:
from django.db import models
class MyModel(models.Model):
field1 = models.CharField(max_length=12)
field2 = models.CharField(max_length=12)
6) Create some test tasks:
Edit cel/tasks.py
from celery.decorators import task
@task
def add(x, y)
return x + y
from cel import models
@task
def addmodel(x, y):
record = models.MyModel.objects.create(field1=x, field2=y)
record.save()
return record
from cel import models
@task(ignore_result=True) # Celery will ignore results sent back to it
def addmodel2(x, y):
record = models.MyModel.objects.create(field1=x, field2=y)
record.save()
return record
7) configure django settings
Edit settings.py
# set django-celery autoloader
import djcelery
djcelery.setup_loader()
...
# Set database settings
...
# set information to connect to rabbitmq (broker)
BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672
BROKER_VHOST = "/my_vhost"
BROKER_USER = "my_rabbit_user"
BROKER_PASSWORD = "1234"
# add to installed apps
INSTALLED_APPS = (
...
'djcelery',
# django-celery
'cel',
)
8) Syncdb
python ./manage syncdb
9) Restart the rabbitmq server (optional)
UBUNTU 10.04 NOTE - Ubuntu starts the rabbitmq
server by default and installs an init script
(/etc/init.d/rabbitmq-server start|stop). For testing,
let's stop the server and restart manually run it in the
foreground to see more output.
sudo /etc/init.d/rabbitmq-server stop
sudo rabbitmq-server start
# will display lots of output and say broker is running.
# The terminal will wait.
10) Start the celery client process
In another terminal (if using virtualenv, be sure to activate it),
in the testrabbitmq Django project, execute the following:
python ./manage.py celeryd -l info
# this will hang the terminal and set the Celery (the message client) waiting.
11) Send a message
In another terminal (if using virtualenv, be sure to activate it),
in the testrabbitmq Django project, execute the following:
python ./manage.py shell
>>> from cel import tasks
>>> result = tasks.add.delay(1, 2)
>>> result.ready()
# waits until task is done
True
>>> result.state
u'SUCCESS'
>>> result.successful()
True
>>> result = tasks.add.delay(1 + 2
) # will cause an error
>>> result.successful()
False
>>> result = tasks.addmodel.delay('a','b')
>>> result.successful()
True
>>> for i in range(0,1000):
# stresstest
.... result = tasks.addmodel.delay('a','b')
# alternate syntax (more flexible - can pass args)
>>> result = tasks.addmodel.apply_asyc(args=['c','d'])
>>> result.successful()
# execute at a given time
>>> from datetime import datetime, timedelta
>>> result = tasks.addmodel.apply_asyc(args=['c','d'],
eta=datetime.now() + timedelta(minutes=1))
>>> result.successful()
# execute after a given number of seconds
>>> result = tasks.addmodel.apply_asyc(args=['c','d'],
countdown=3)
# alternate syntax (can queue functions you don't own)
>>> from celery.execute import send_task
>>> tasks.addmodel.name
'cel.tasks.addmodel'
>>> result = send_task("cel.tasks.addmodel", args=['e', 'f'])
>>> result.get()
>>> from djcelery.models import TaskMeta
>>> TaskMeta.objects.count()
# this table contains all results meta info
# for tasks not defined with ignore_result=True
APPENDIX - rabbitmq commands
sudo rabbitmqctl stop
# stop the rabbitmq
sudo rabbitmqctl list_users
# list the available users
sudo rabbitmqctl list_vhosts
# list the available vhosts
sudo rabbitmqctl list_queues
Listing queues ...
celery 544 # by default, lists the queues in the server
# and how many messages in them
sudo rabbitmqctl list_queues [options]
common list_queues options
name = name of queue
durable = queue survives server restarts
pid = erlang process id
messages_ready = ready to be delivered to clients
messages = queue depth
memory = bytes of mem consumed by erlang process
# NOTES, for any changes to the code, be sure to restart the celery client (./manage.py celeryd)
UPDATE: I did a talk about this at Chicago Djangonaughts. See here:
https://groups.google.com/forum/?fromgroups#!topic/django-chicago/iJr9nStrM-U
http://www.turnkeylinux.org/blog/django-celery-rabbitmq
http://ask.github.com/celery/userguide/executing.html