Asynchronous Tasks with Celery + Redis in Django

Suppose your task takes about 20 seconds to finish before responding your client. Imagine you are the user of your application. It happens that a lot that processing can be actually be spared until later time whenever user makes a request to your site. AJAX is one way of doing tasks asynchronously, but we want to run our Python code asynchronously. Therefore, we resort to using Task Queue and Message Broker.

The typical process flow of a request to Django app:

  1. Receive request
  2. Server relay to a specific View
  3. Process request in View
  4. Respond to request

Say we want to fetch some data externally and process them whenever there is an update. Sounds familiar right? Yes, you can think of it as a Webhook on your CI server.

In this post, I won't be discussing about why I choose Redis over other message broker like RabbitMQ, ActiveMQ or Kafka. The scope of this post is about applying task queue to facilitate execution of asynchronous task in Django. If you new to task queue, have no idea how to implement async tasks, or looking for solution to integrate Celery with Django, keep reading!

Installation of Celery and Redis

First, make sure you installed Celery and Redis interface, you can do so by downloading from PyPi.

pip install celery redis

Next, install Redis Server, you can refer to this post from DigitalOcean.

If you are running on Docker, simply 'up' a Redis container using image in Docker Hub.

Let's get started!

Initially, this is the structure of our project for demonstration:

.
├── api
│   ├── apps.py
│   ├── __init__.py
│   └── views.py
├── django_with_celery
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── manage.py

Let us add a URL entry and a View into our Django project.

# django_with_celery/urls.py
from django.conf.urls import url
from api.views import BuildTrigger

urlpatterns = [
  url(r'^api/trigger_build/', BuildTrigger.as_view()),
]
# api/views.py
from rest_framework.views import APIView
from rest_framework.response import Response

class BuildTrigger(APIView):
  def post(self, request):
    build_something() # This would take 1 minute to finish
    return Response(None, status=201)

From the BuildTrigger View, build_something() takes about a minute to finish execution. The whole process from sending Http request from client to receiving a response would definitely takes more than a minute. It is likely the HTTP client drop this connection if response time > connection timeout. Since the endpoint is to trigger a build, we should can run it build_something() after responding.

To perform tasks asynchronously, we use a task queue to queue all pending tasks. In our case, we will use Celery, an asynchronous task queue based on distributed message passing and Redis as the message broker.

To integrate Celery with Django, create a __init__.py in the project root directory. Then another Python file celery.py in your django_with_celery app directory.

In your celery.py, add the following code:

# django_with_celery/celery.py
from __future__ import absolute_import
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_with_celery.settings') # DON'T FORGET TO CHANGE THIS ACCORDINGLY

app = Celery('django_with_celery')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

In celery.py, we define the environment variable DJANGO_SETTINGS_MODULE. The last line, autodiscover_tasks() will run through all our modules and look for asynchronous task.

Next, we define these in our __init__.py of our project.

#./__init__.py

from __future__ import absolute_import, unicode_literals
from django_with_celery.celery import app as celery_app

__all__ = ['celery_app']

After adding these codes, add the following variable into your main settings.py, in our case, the _django_withcelery/settings.py.

CELERY_BROKER_URL = 'redis://localhost'

This would define the host of our Redis instance. Celery will look for variables with 'CELERY_' prefix in the settings.py defined in system environment.

Now, all our integration is done but definition of async tasks. To define an async task, simply import and add the decorator @shared_task to each method or function.

from celery import shared_task

@shared_task
def build_something():
  # some code here

You may find some tutorial suggest you to define all async task in a separate module. Well, it is entirely up to you, as by enabling autodisover, Celery will look for all task with @shared_task decorator.

By adding @shared_task decorator, you method can now run asynchronously, but it doesn't run naturally. We have to call the method in a way telling it to run asynchronously. To do so, simply call your method with the suffix apply_async() or delay()

build_something.apply_async()
# OR
build_something.delay()

There are some differences between the two method regarding configurability. You can refer to Celery documentation for more details.

All good, but not done

Until this step, all codes required to run tasks asynchronously are done, but you will need few more step to let your tasks run properly.

Make sure your Redis is up and running. Run redis_server to start your server if you are running it in your local machine. You can also run redis_cli to attempt connection to your Redis server.

We need to spawn workers.

Open your Terminal. Run the following command from your project root:

celery -A django_with_celery.celery worker -l DEBUG -E

This will spawn a worker for our app.

Run your app

Now you should be able to run your tasks, whenever a task is queued into Celery, it will be logged to your console running command from above.

Handy Tool: Flower

Flower is a monitoring tool for Celery. You can learn more about it from their GitHub. It provides real-time monitoring to your Celery clusters, remote control, broker monitoring, HTTP API, etc.

First published on 2017-10-31

Republished on Hackernoon

No Comments Yet