Boost your Django Project with Celery

Users can add authors, book descriptions, authors, cover photos, etc. In order to save storage and bandwidth, you decide to scale each photo to 100x100 with following code:

def resize_book_photo_command(book: Book):
logger.debug(f'Changing {book.title}')
_, extension = os.path.splitext(book.cover_photo.path)
photo = default_storage.open(book.cover_photo.path)
resized_photo = Image.open(photo.file).resize((100, 100), Image.ANTIALIAS)
b = io.BytesIO()
resized_photo.save(b, format=extension[1:])
resp = default_storage.save(book.cover_photo.path, ContentFile(b.getvalue()))
book.cover_photo = os.path.join('books', os.path.basename(resp))
book.save()
logger.debug(f'Successfully saved {book.cover_photo}')

This code is called each time the user clicks “save” on the form. This is a poor solution because:

  1. Book covers in these resolutions look stupid;
  2. Users need to wait for the processing to finish even though they do not need results immediately.

While tackling the first issue is beyond the scope of this article, we can do something with the second one. What if we dispatch the task to some external service and give the user the answer immediately, without waiting for this task to complete?

Asynchronous processing

That’s where Celery comes useful. The concept behind it is very simple:

  1. User makes a request;
  2. Django dispatches a task;
  3. Django sends back a response;
  4. Celery checks if any task is available and if yes, takes it from the broker;
  5. Celery processes the task.
django celery boost

There are a few things worth mentioning:

  1. Celery can process a task long after the user has received a response from Django;
  2. Results may or may not be saved;

The former issue is an asset, whereas the latter one can be troublesome. We’ll get back to it later. For now, let’s configure our project to use Celery.

Basic configuration

The coolest thing about Celery is that it can be used within your project without needing to create a separate application. It’s a bit like splitting your app to microservices but maintaining a monolithic codebase which is a big asset if we want to speed up things without splitting logic or configuring an advanced monitoring system (though we’ll do some monitoring soon). This also makes the initial config very easy.

First, install dependencies, which are celery and redis packages (I am going to use the latter one as a broker):

$ pip install celery redis

In your settings.py file add the following line:

CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'

In my case, I am not using docker-compose and want everything to be set locally, so the environment variables are:

REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_CELERY_DB=0

In the directory, where your settings are, add celery.py file with the following contents:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'example.settings')

app = Celery('example')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

The last thing is to create a task and call it. Tasks are typically kept in tasks.py files, so add one to your Django app:

from celery import shared_task

@shared_task
def resize_book_photo(book_id: int):
resize_book_photo_command(Book.objects.get(id=book_id))

And call it instead of the original function using apply_async() method:

class BookForm(forms.ModelForm):

def save(self, commit=True):
book = super().save(commit)
resize_book_photo.apply_async((book.id,))
return book

Now you need to run your broker with:

$ brew services run redis

Or, for Linux:

$ sudo systemctl start redis

And run celery:

$ celery -A example worker -l debug

This runs celery with logging level set to debug.

Monitoring and saving results

Since currently, the only way to check if everything is OK is to manually check the results, you might feel tempted to look for a solution that will make it easier. That’s where celery-results and flower become useful. The first one is used for persisting results and enables storing tracebacks, call arguments, return values and task statuses, the latter one is for monitoring.

First, install dependencies:

$ pip install django-celery-results flower

They need to be added to your installed apps, and for results, the backend needs to be selected. I’ll use my project’s database:

# Application definition

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',

'django_celery_results',
'debug_toolbar',

'books',
]
CELERY_RESULT_BACKEND = 'django-db'

Run migrations for  results:

$ manage.py migrate django_celery_results

And start flower:

$ celery flower -A example

You should be able to see the panel on your localhost on port 5555.

django celery

Results will be listed in your admin panel:

Boosting further

Since Celery is so easy to use, you might want to move more tasks to it. Imagine, for example, that you want to use it for sending email notifications. This is reasonable, but the current setup may still have drawbacks. Your queues can still get cluttered by long-running tasks or peaks (if many tasks are dispatched at once). Of course, you can rise the concurrency, using -c option, but this can raise your costs if you pay per computing time. We are going to solve it in two steps:

  1. Split tasks into queues: one for handling image resizing, second for everything else;
  2. Introduce autoscaling.

Our improved architecture is going to look similar to this:

django celery

We are going to leave one worker for media, but notifications and everything else will be rescaled. Once again, update your settings file:

CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_TASK_ROUTES = {
'books.tasks.resize_book_photo': 'media'
}

The important thing to note here: in the docs, there’s an error that has caused me some pain.

Now, every time resizing takes place, it’ll go to a queue called “media”, and everything else is going to go to a queue called: “default”. For scaling, we want to have 1 process running, and increasing up to 10 when necessary and we don’t want to touch the settings file anymore. Command to run this setup is the following:

$ celery -A blacksheep worker -Q default --autoscale=10,1 -n default

This tells celery to run service collecting messages from a queue called: “default”, create 1 process and scale it automatically to 10 and give it a meaningful name. Do a similar thing for the other queue:

$ celery -A blacksheep worker -Q media -n media

In your flower panel, you should see something similar to this:

Let’s simulate a heavy workload. I have created a function that will be called every time someone hits endpoint “/test/”:

@shared_task
def list_test_books_per_author():
time.sleep(5)
logger.info("Done")

And hit it 100 times:

$ for i in $(seq 1 100); do curl http://localhost:8000/test/ &; done

As we can see, the scaling up actually took place:

django celery

And while tasks from default queue were still being processed, the media queue was free and completed the task it was ordered (there is 1 task in the column: “processed” for media worker, while the default one still has pending tasks):

Task retrying

Let’s get back to the notifications. It can happen that for example email service is unavailable and sending will result in some exceptions like connection timeouts. In this case, we’d like to retry sending tasks a few times. Let’s simulate the problem:

@shared_task
def send_notifications():
if random.randint(0, 3) != 0:
logger.error('Something went wrong. Failing')
raise Exception('Can\'t send email.')
logger.debug('Sending email...')
send_mail(
subject='Notification',
message='This is test email. Ignore it.',
from_email='no-reply@example.com',
recipient_list=['test1@otherexample.com']
)

3 out of 4 tries will result in an error. This means we are dealing with very unreliable service. What we should do is to make the task bound in order to be able to manually tell it when it should retry sending. In some cases we won’t expect retry to take place, eg. when our credentials are wrong, retrying will be pointless. What we need to do is:

  1. Make task bound - this will let us access Task instance;
  2. Set retries count and countdown indicating how long can we wait for the service to become available again;
  3. Actually call the retry with retry() method.

@shared_task(bind=True, max_retries=3)
def send_notifications(self):
try:
if random.randint(0, 3) != 0:
logger.error('Something went wrong. Failing')
raise Exception('Can\'t send email.')
except Exception as exc:
self.retry(exc=exc, countdown=5)
logger.debug('Sending email...')
send_mail(
subject='Notification',
message='This is test email. Ignore it.',
from_email='no-reply@example.com',
recipient_list=['test1@otherexample.com']
)

In our dashboard we are going to see tasks that have a new status: “RETRY”:

This may fail or succeed later. If it succeeds, in Flower panel we can see how many times it was retried and what exceptions occurred.


Bonus: Debugging

At some point, you might want easy access to the tasks in order to debug them. There are numerous workarounds like running tasks from the console, but a major breakthrough in my life was when a friend of mine discovered “CELERY_TASK_ALWAYS_EAGER” setting. And I’m not joking, it saved me countless amounts of pain. Set this to true in order to run tasks synchronously, blocking the execution until the task finishes. It will behave like normal task saving results and so on, but allowing you to access it from within your debugger.

Call stack:

Boosting your Django project with Celery

This is only a small fraction of what Celery can do. For example, I haven’t covered celery beat functionality, which was done by my colleague before. But this should be enough for you to get the general idea behind it. Celery is a very easily configurable system that offers many features like scalability, ability to divide responsibilities for processing a certain type of tasks, monitoring or reliability with communication with external services due to the possibility of retrying failed tasks.

Navigate the changing IT landscape

Some highlighted content that we want to draw attention to to link to our other resources. It usually contains a link .