Celery logo
Celery logo

Writing Unit Tests for Celery Tasks with async Functions

Introduction

(See my blog-examples repo for the code used in the post.)

Recently I had to write unit tests for a periodic Celery task that sent a WebSocket message (using Channels) every 2 seconds. Channels requires writing unit tests in an asynchronous environment (i.e., with functions using async/await), which Python has supported since Python 3.5, released in 2015.

Here's the tricky part: while Celery supports running tasks asynchronously, it doesn't currently support writing tasks as async functions (though it plans to in version 5.0). In other words, this:



# tasks.py
from asyncio import sleep
from celery.task import periodic_task


@periodic_task(
    run_every=5,
    name='return_hello',
)
async def return_hello():
    await sleep(1)
    return 'hello'

will lead to this:

RuntimeWarning: coroutine 'return_hello' was never awaited

Our first order of business is to find a way to use an async function as a Celery task.

async Celery Task

Because Celery requires task functions to be synchronous, and the function we're looking to test is asynchronous, we need a way to run an asynchronous function within a synchronous environment. Luckily, the Django/Channels team has us covered with the AsyncToSync (a.k.a. async_to_sync) utility:

AsyncToSync lets a synchronous subthread stop and wait while the async function is called on the main thread's event loop, and then control is returned to the thread when the async function is finished.

In other words, async_to_sync wraps an asynchronous function so it can be used like a regular, synchronous function. Let's see what a simple implementation looks like:

# tasks.py
from asgiref.sync import async_to_sync
from asyncio import sleep
from celery.task import periodic_task


async def return_hello():
    await sleep(1)
    return 'hello'


@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

Instead of trying to to create a Celery task by decorating an async function, which we saw above doesn't work, I've made two changes here:

  1. Write an asynchronous function (return_hello)
  2. Write a simple task function that wraps return_hello using async_to_sync

Now, our business logic (return_hello) is written as an async function, and our task (task_return_hello) is a standard non-async function with no business logic of its own.

A benefit of writing skinny Celery tasks like this is that the business logic is separate from the task itself, making that logic easier to reuse—and easier to test.

Before we get to writing tests, you might wonder why I didn't wrap sleep(1) inside async_to_sync and keep return_hello as a synchronous function. The answer is that a function wrapped in async_to_sync is difficult to mock, which we'll need to do in one of our tests.

Writing Unit Tests

Celery's testing guide recommends mocking any non-task logic when writing task unit tests. In short: test that the task runs, not what it runs.

If we keep business logic out of the task, as I did above, we have two separate pieces of functionality that we need to test:

  1. That the business logic works
  2. That the task calls the function that performs business logic

Let's get to it.

Test the business logic

# test_tasks.py
import pytest

from tasks import return_hello


@pytest.mark.asyncio
async def test_return_hello():
    result = await return_hello()
    assert result == 'hello'

To test asynchronous functions, we're using pytest along with the pytest-asyncio plugin. When a test function is decorated with @pytest.mark.asyncio, that test will be run in an asynchronous environment.

Test the task

# test_tasks.py
from unittest.mock import patch

from tasks import task_return_hello


@patch('tasks.return_hello')
def test_task_return_hello_failing(return_hello_mock):
    """Ensure the task runs in Celery and calls the correct function."""
    task_return_hello.apply()
    assert return_hello_mock.called

Instead of running return_hello again, I took Celery's advice and mocked that function in the task unit test using unittest.mock.patch. There isn't much to the test, but it's doing important work.

task_return_hello.apply() executes the task. If it doesn't throw an error we know that the function is configured as a Celery task and that it should run as specified.

assert return_hello_mock.called confirms that when the task runs it will call our return_hello function (but doesn't call return_hello in the test).

Conclusion

Thanks to async_to_sync, using async functions in Celery tasks is simple. Plus, if the task logic is written in an async function, it's easy to test, too.

Happy testing!