Writing Unit Tests for Celery Tasks with async
Functions
Introduction
(See my blog-examples
repo for the code used in the post.)
Recently I had to write unit tests for a periodic Celery task that sent a WebSocket message (using Channels) every 2 seconds. Channels requires writing unit tests in an asynchronous environment (i.e., with functions using async
/await
), which Python has supported since Python 3.5, released in 2015.
Here's the tricky part: while Celery supports running tasks asynchronously, it doesn't currently support writing tasks as async
functions (though it plans to in version 5.0). In other words, this:
# tasks.py
from asyncio import sleep
from celery.task import periodic_task
@periodic_task(
run_every=5,
name='return_hello',
)
async def return_hello():
await sleep(1)
return 'hello'
will lead to this:
RuntimeWarning: coroutine 'return_hello' was never awaited
Our first order of business is to find a way to use an async
function as a Celery task.
async Celery Task
Because Celery requires task functions to be synchronous, and the function we're looking to test is asynchronous, we need a way to run an asynchronous function within a synchronous environment. Luckily, the Django/Channels team has us covered with the AsyncToSync
(a.k.a. async_to_sync
) utility:
AsyncToSync lets a synchronous subthread stop and wait while the async function is called on the main thread's event loop, and then control is returned to the thread when the async function is finished.
In other words, async_to_sync
wraps an asynchronous function so it can be used like a regular, synchronous function. Let's see what a simple implementation looks like:
# tasks.py
from asgiref.sync import async_to_sync
from asyncio import sleep
from celery.task import periodic_task
async def return_hello():
await sleep(1)
return 'hello'
@periodic_task(
run_every=2,
name='return_hello',
)
def task_return_hello():
async_to_sync(return_hello)()
Instead of trying to to create a Celery task by decorating an async
function, which we saw above doesn't work, I've made two changes here:
- Write an asynchronous function (
return_hello
) - Write a simple task function that wraps
return_hello
usingasync_to_sync
Now, our business logic (return_hello
) is written as an async
function, and our task (task_return_hello
) is a standard non-async
function with no business logic of its own.
A benefit of writing skinny Celery tasks like this is that the business logic is separate from the task itself, making that logic easier to reuse—and easier to test.
Before we get to writing tests, you might wonder why I didn't wrap sleep(1)
inside async_to_sync
and keep return_hello
as a synchronous function. The answer is that a function wrapped in async_to_sync
is difficult to mock, which we'll need to do in one of our tests.
Writing Unit Tests
Celery's testing guide recommends mocking any non-task logic when writing task unit tests. In short: test that the task runs, not what it runs.
If we keep business logic out of the task, as I did above, we have two separate pieces of functionality that we need to test:
- That the business logic works
- That the task calls the function that performs business logic
Let's get to it.
Test the business logic
# test_tasks.py
import pytest
from tasks import return_hello
@pytest.mark.asyncio
async def test_return_hello():
result = await return_hello()
assert result == 'hello'
To test asynchronous functions, we're using pytest
along with the pytest-asyncio
plugin. When a test function is decorated with @pytest.mark.asyncio
, that test will be run in an asynchronous environment.
Test the task
# test_tasks.py
from unittest.mock import patch
from tasks import task_return_hello
@patch('tasks.return_hello')
def test_task_return_hello_failing(return_hello_mock):
"""Ensure the task runs in Celery and calls the correct function."""
task_return_hello.apply()
assert return_hello_mock.called
Instead of running return_hello
again, I took Celery's advice and mocked that function in the task unit test using unittest.mock.patch
. There isn't much to the test, but it's doing important work.
task_return_hello.apply()
executes the task. If it doesn't throw an error we know that the function is configured as a Celery task and that it should run as specified.
assert return_hello_mock.called
confirms that when the task runs it will call our return_hello
function (but doesn't call return_hello in the test).
Conclusion
Thanks to async_to_sync
, using async
functions in Celery tasks is simple. Plus, if the task logic is written in an async
function, it's easy to test, too.
Happy testing!