Introduction to Celery

Mehedee Ahmed Siddique
8 min readSep 18, 2023

--

Celery is an asynchronous task queue for python. We mostly use it to run tasks outside the cycle of our regular application, e.g. HTTP request-response cycle. Also, we can use Celery to schedule tasks for a specific time or run periodic tasks, as in cron-jobs.

A classic example is sending an email on user signup. You want to send a user an on-boarding email when they’re signing up but that means you’d need to hold the request, post the email, wait for success response for email sending and then, return response to the user . As sending email is a network call, this might take some time which means, for this time, your application and the request sits idly, so does the user, thinking why the hell am I signing up for this shitty application!

Here comes Celery to the rescue. With Celery you can just publish an email sending task to a Celery Worker (a Celery application containing your email sending task's codes, more on this later), finish the signup process and return response to the user, all while the Celery worker is doing the task of sending the email to the user. This way your request-response lifecycle is cut short and everyone is happy.

Email Sending Task Triggered and Executed Asynchronously in Celery Worker

There are two main and one optional components of Celery:

Broker — as the name suggests, an intermediary between the client application/s and worker/s.

Worker — where the task actually executes.

Result-Backend aka. Backend (Optional) — where the results of executed tasks are stored for retrieval.

Broker

The Broker is responsible for receiving task messages from the client application/s, queue them and deliver them to available worker/s. It works as an intermediary. Broker is also responsible for maintaining multiple queues according to your need and priority. It also does the job of redelivering failed tasks to workers for retrying(if configured) and more. The most popular broker used with Celery is RabbitMQ. Redis can also be used as broker.

Worker

The worker is the executor of your tasks. There can be multiple workers running at once. Workers continuously monitor broker for new tasks. Whenever a worker has a free slot, it checks with broker for new tasks and picks them if available. Worker then executes the tasks and checks with broker for new tasks again.

Result Backend

Celery gives you an option to store the result of your task. It is achieved with a result-backend. Backend stores the task’s execution result along with the return value from your tasks. Backend is also necessary to design some of the canvas workflows(more on this later). One of the most popular choice for Result Backend is Redis because of it's super fast Key-Value storage that makes fetching results of a task very efficient. RabbitMQ can also be used as Backend.

Lets jump into some actual codes now. We will use RabbitMQ as broker and Redis as backend. Lets create an app directory inside our content root directory and create a python file worker.py.

First we create connections strings for RabbitMQ broker and Redis backend. Then we pass them to create a celery-app instance.

from celery import Celery


# Broker(RabbitMQ) connection str
CELERY_BROKER: str = (
f"pyamqp://user:Pass1234@rabbitmq:5672//"
)

# Result Backend(Redis) connection str
CELERY_BACKEND: str = f"redis://redis:6379"


# Celery App instance
celery_app = Celery(
__name__, broker=CELERY_BROKER, backend=CELERY_BACKEND
)

Our celery application is ready. Now lets make a task for it to execute.

First, we will define a function send_newsletter_welcome_email_task that accepts an email address string and sends a welcome email to that address. Lets import and use smtplib and loguru for email sending and logging. Then we will import the celery_app instance from the worker.py and decorate our email sending function with @celery_app.task().

from loguru import logger
from smtplib import SMTP

from .worker import celery_app


@celery_app.task()
def send_newsletter_welcome_email_task(email: str):
logger.info(f"Send welcome email task received")
with SMTP(
host="smtp.freesmtpservers.com",
port=25,
timeout=60,
) as smtp:
from_addr = "newsletter@mehedees.dev"
smtp.sendmail(
from_addr=from_addr,
to_addrs=email,
msg=f"To:{email}\nFrom: {from_addr}\r\nSubject: Welcome\n\nWelcome to the newsletter!",
)
logger.info("Email successfully sent")
logger.info(f"Send welcome email task finished")
return email

Our task is ready. Now we will define a mock web app to trigger our task.

from fastapi import FastAPI, Body
from loguru import logger

from .tasks import send_newsletter_welcome_email_task


app = FastAPI(
debug=True,
title="Test Python Celery",
description="Test basics of Python Celery",
openapi_url=f"/openapi.json",
)


@app.post(path='/newsletter/signup')
async def newsletter_signup(email: str = Body(embed=True)):
logger.info(f"Received newsletter signup request from {email}")
# Doing some processing bla bla bla
logger.info("Initiating welcome email sending task")
send_newsletter_welcome_email_task.delay(email)
# Return response now, celery will take care of sending the welcome mail
return {
'success': 'True',
'code': 200,
}

We have created a quick FastAPI web application with newsletter signup route. We've also imported our task here. The view function does some dummy processing and then calls the email sending task. To call the task we call .delay() on our task: send_newsletter_welcome_email_task.delay(email). When triggered, the task will be sent to the broker(RabbitMQ). Then the broker will deliver the task to the available worker. The worker will then execute the task resulting in the welcome email being sent.

We could also trigger the task with .apply_async([email,]). .delay() is actually a shortcut for .apply_async().

Our code is now almost ready. We have defined the celery app instance, defined a task, created a web app to trigger our task. We have one modification left for the celery app instance, we need to introduce our task to the application beforehand. Otherwise our worker won’t recognize the task that has been delivered to it for execution. Lets do it

from celery import Celery


# Broker(RabbitMQ) connection str
CELERY_BROKER: str = (
f"pyamqp://user:Pass1234@rabbitmq:5672//"
)

# Result Backend(Redis) connection str
CELERY_BACKEND: str = f"redis://redis:6379"


# Celery App instance
celery_app = Celery(
__name__, broker=CELERY_BROKER, backend=CELERY_BACKEND
)

# Autodiscovery of defined tasks
celery_app.autodiscover_tasks(packages=['app'])

The last line discovers files named task.py inside the package paths provided to the autodiscover_tasks method. We could also pass paths to task files with include parameter when creating the Celery app object.

Our code is ready now. Lets run our Celery Worker with following command

celery -A app.worker.celery_app worker — loglevel=INFO

/usr/local/lib/python3.11/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this is
2023-09-13T18:28:38.979250505Z absolutely not recommended!
2023-09-13T18:28:38.979365023Z
2023-09-13T18:28:38.979406969Z Please specify a different user using the --uid option.
2023-09-13T18:28:38.979557797Z
2023-09-13T18:28:38.979587460Z User information: uid=0 euid=0 gid=0 egid=0
2023-09-13T18:28:38.979641167Z
2023-09-13T18:28:38.979666256Z warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
2023-09-13T18:28:39.124459733Z
2023-09-13T18:28:39.124513560Z -------------- celery@e96b71c99d22 v5.3.1 (emerald-rush)
2023-09-13T18:28:39.124523657Z --- ***** -----
2023-09-13T18:28:39.124527360Z -- ******* ---- Linux-5.15.49-linuxkit-x86_64-with 2023-09-13 18:28:39
2023-09-13T18:28:39.124530601Z - *** --- * ---
2023-09-13T18:28:39.124533566Z - ** ---------- [config]
2023-09-13T18:28:39.124536551Z - ** ---------- .> app: app.worker:0x7fcc3b5f9cd0
2023-09-13T18:28:39.124539840Z - ** ---------- .> transport: amqp://user:**@rabbitmq:5672//
2023-09-13T18:28:39.124542931Z - ** ---------- .> results: redis://redis:6379/
2023-09-13T18:28:39.124546424Z - *** --- * --- .> concurrency: 6 (prefork)
2023-09-13T18:28:39.124549579Z -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
2023-09-13T18:28:39.124552584Z --- ***** -----
2023-09-13T18:28:39.124555464Z -------------- [queues]
2023-09-13T18:28:39.124558372Z .> celery exchange=celery(direct) key=celery
2023-09-13T18:28:39.124561364Z
2023-09-13T18:28:39.124564209Z
2023-09-13T18:28:39.124567139Z [tasks]
2023-09-13T18:28:39.124570067Z . app.tasks.send_newsletter_welcome_email_task

[2023-09-13 18:29:10,192: INFO/MainProcess] mingle: searching for neighbors
2023-09-13T18:29:11.223127806Z [2023-09-13 18:29:11,222: INFO/MainProcess] mingle: all alone
2023-09-13T18:29:11.268981135Z [2023-09-13 18:29:11,268: INFO/MainProcess] celery@e96b71c99d22 ready.
2023-09-13T18:29:14.031251535Z [2023-09-13 18:29:14,030: INFO/MainProcess] Events of group {task} enabled by remote.

We will see something like

Our worker is now running. Lets run our FastAPI app now on port 12345:

uvicorn app.app:app — reload — workers 1 — host 0.0.0.0 — port 12345

We will see something like:

INFO:     Will watch for changes in these directories: ['/app']
2023-09-13T18:28:36.833486177Z INFO: Uvicorn running on http://0.0.0.0:12345 (Press CTRL+C to quit)
2023-09-13T18:28:36.833500622Z INFO: Started reloader process [1] using WatchFiles
2023-09-13T18:28:39.603132930Z INFO: Started server process [8]
2023-09-13T18:28:39.603200518Z INFO: Waiting for application startup.
2023-09-13T18:28:39.604262220Z INFO: Application startup complete.

All is ready now. Lets finally test if Celery can execute our task asynchronously. We will now post an email address to our newsletter signup route

curl -d ‘{“email”:” test.celery@mehedees.dev”}’ -H “Content-Type: application/json” -X POST http://0.0.0.0:12345/newsletter/signup

Our mock app logs:

2023-09-13 18:31:28.498 | INFO     | app.app:newsletter_signup:17 - Received newsletter signup request from test.celery@mehedees.dev
2023-09-13T18:31:28.499976407Z 2023-09-13 18:31:28.499 | INFO | app.app:newsletter_signup:19 - Initiating welcome email sending task
2023-09-13T18:31:29.099523900Z INFO: 172.18.0.1:61574 - "POST /newsletter/signup HTTP/1.1" 200 OK

Mock app has triggered the email sending task(sent to broker) and returned response without waiting for the blocking email sending task to finish executing. Meanwhile Celery worker logs:

[2023-09-13 18:31:29,103: INFO/MainProcess] Task app.tasks.send_newsletter_welcome_email_task[cb2f6812-16d6-48ec-9e04-fa20b57bfead] received
2023-09-13T18:31:29.141559704Z 2023-09-13 18:31:29.129 | INFO | app.tasks:send_newsletter_welcome_email_task:9 - Send welcome email task received
2023-09-13T18:31:31.923187151Z 2023-09-13 18:31:31.922 | INFO | app.tasks:send_newsletter_welcome_email_task:21 - Email successfully sent
2023-09-13T18:31:32.237598103Z 2023-09-13 18:31:32.237 | INFO | app.tasks:send_newsletter_welcome_email_task:22 - Send welcome email task finished
2023-09-13T18:31:32.319138219Z [2023-09-13 18:31:32,313: INFO/ForkPoolWorker-4] Task app.tasks.send_newsletter_welcome_email_task[cb2f6812-16d6-48ec-9e04-fa20b57bfead] succeeded in 3.189990901999977s: 'test.celery@mehedees.dev'

Worker has successfully received the task from broker and executed it asynchronously.

Task Result

As we’ve configured result-backend, let’s play a bit with results now. Fire up a python console in project(venv activated, if using one) and import the task. Now we’ll call the task. Calling a task returns an AsyncResult object. We can check task states, perform several operations and eventually retrieve task return value from this object. Let’s see it in action

>>> from app.tasks import send_newsletter_welcome_email_task
>>> result = send_newsletter_welcome_email_task.delay("test.celery@mehedees.dev")
>>> type(result)
<class 'celery.result.AsyncResult'>
>>> result.app
<Celery app.worker at 0x7d0c432ad220>
>>> result.id
'faff38ff-494a-4231-912b-8471e46c032d'
>>> result.ignored
False
>>> result.ready()
True
>>> result.successful()
True
>>> result.failed()
False
>>> result.get()
'test.celery@mehedees.dev'
>>> result.state
'SUCCESS'

Follow API documentation for more details on AsyncResult instance. A fair warning, we should always release resources used by result-backend by calling get() or forget() on EVERY AsyncResult instance after calling a task. Otherwise, unused resources would pile up and hog your resources.

Celery Beat and Periodic Tasks

Last topic of this post, we’ll take a look a Celery Beat to run scheduled and periodic tasks. We are going to schedule our weekly newsletter email sending task to run every Monday at 9:30 AM. For practice, we’ll also schedule another news collecting task to be run every 2 hours. First, we need to add periodic tasks entry to the celery_app config. We’ll edit worker.py and add following lines

from celery.schedules import crontab


celery_app.conf.beat_schedule = {
'email_sending_task_0930_hour': { # this key name doesn't mean anything
'task': 'tasks.send_weekly_newsletter_email_task',
'schedule': crontab(hour=9, minute=30, day_of_week=1),
'args': ()
},
'news_collecting_task_every_24_hours': {
'task': 'tasks.collect_daily_news_for_newsletter_task',
'schedule': 86400, # every 24*60*60 seconds,
'args': ()
}
}

Important note, Celery Beat uses UTC timezone by default for task scheduling. You should set your timezone to celery app config if you need Beat to use it.

celery_app.conf.timezone = 'Asia/Dhaka'

Now lets fire up the Celery Beat instance running following command

celery -A app.worker.celery_app beat

That’s the gist of scheduling tasks with Celery Beat. For details go through Celery Periodic Tasks.

You can find the fully functional and dockerized codebase here.

I started this article originally to share my experience with Celery Canvas Workflows and Concurrency options but then realized, an introductory article might be helpful. So, I’ll be writing at least two more articles, one focusing on production ready Celery app development and another on advanced features and issues.

Constructive criticisms and feedback are welcome!

Keep learning, never settle(sorry OnePlus) and, till I see you again!

Edit: The 2nd post on Configuring Celery is now complete now.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Mehedee Ahmed Siddique
Mehedee Ahmed Siddique

No responses yet

Write a response