blog-examples repo for the code used in the post.)
Recently I had to write unit tests for a periodic Celery task that sent a WebSocket message (using Channels) every 2 seconds. Channels requires writing unit tests in an asynchronous environment (i.e., with functions using
await), which Python has supported since Python 3.5, released in 2015.
Here's the tricky part: while Celery supports running tasks asynchronously, it doesn't currently support writing tasks as
async functions (though it plans to in version 5.0). In other words, this:
# tasks.py from asyncio import sleep from celery.task import periodic_task @periodic_task( run_every=5, name='return_hello', ) async def return_hello(): await sleep(1) return 'hello'
will lead to this:
RuntimeWarning: coroutine 'return_hello' was never awaited
Our first order of business is to find a way to use an
async function as a Celery task.
async Celery Task
Because Celery requires task functions to be synchronous, and the function we're looking to test is asynchronous, we need a way to run an asynchronous function within a synchronous environment. Luckily, the Django/Channels team has us covered with the
AsyncToSync lets a synchronous subthread stop and wait while the async function is called on the main thread's event loop, and then control is returned to the thread when the async function is finished.
In other words,
async_to_sync wraps an asynchronous function so it can be used like a regular, synchronous function. Let's see what a simple implementation looks like:
# tasks.py from asgiref.sync import async_to_sync from celery.task import periodic_task async def return_hello(): await sleep(1) return 'hello' @periodic_task( run_every=2, name='return_hello', ) def task_return_hello(): async_to_sync(return_hello)()
Instead of trying to to create a Celery task by decorating an
async function, which we saw above doesn't work, I've made two changes here:
- Write an asynchronous function (
- Write a simple task function that wraps
Now, our business logic (
return_hello) is written as an
async function, and our task (
task_return_hello) is a standard non-
async function with no business logic of its own.
A benefit of writing skinny Celery tasks like this is that the business logic is separate from the task itself, making that logic easier to reuse--and easier to test.
Before we get to writing tests, you might wonder why I didn't wrap
async_to_sync and keep
return_hello as a synchronous function. The answer is that a function wrapped in
async_to_sync is difficult to mock, which we'll need to do in one of our tests.
Writing Unit Tests
Celery's testing guide recommends mocking any non-task logic when writing task unit tests. In short: test that the task runs, not what it runs.
If we keep business logic out of the task, as I did above, we have two separate pieces of functionality that we need to test:
- That the business logic works
- That the task calls the function that performs business logic
Let's get to it.
Test the business logic
# test_tasks.py import pytest from tasks import return_hello @pytest.mark.asyncio async def test_return_hello(): result = await return_hello() assert result == 'hello'
To test asynchronous functions, we're using
pytest along with the
pytest-asyncio plugin. When a test function is decorated with
@pytest.mark.asyncio, that test will be run in an asynchronous environment.
Test the task
# test_tasks.py from unittest.mock import patch from tasks import task_return_hello @patch('tasks.return_hello') def test_task_return_hello_failing(return_hello_mock): """Ensure the task runs in Celery and calls the correct function.""" task_return_hello.apply() assert return_hello_mock.called
Instead of running
return_hello again, I took Celery's advice and mocked that function in the task unit test using
unittest.mock.patch. There isn't much to the test, but it's doing important work.
task_return_hello.apply() executes the task. If it doesn't throw an error we know that the function is configured as a Celery task and that it should run as specified.
assert return_hello_mock.called confirms that when the task runs it will call our
return_hello function (but doesn't call return_hello in the test).
async functions in Celery tasks is simple. Plus, if the task logic is written in an
async function, it's easy to test, too.