AsyncTask - расширение django-celery

AsyncTask - модуль, расширяющий функционал работы с задачами celery и надстройкой django-celery.

Предпосылки

Один из случаев применения задач celery - организация выполнения длительных процессов и отслеживание их состояния. Это так называемые “фоновые задачи”, которые выполняются параллельно с основным функционалом web-приложения.

Для подобного случая необходимо расширить функционал задач celery:

  • Обеспечить классификацию “фоновых задач” и расширить атрибуты задачи в соответствие с потребностями приложения.
  • Отслеживать состояние “фоновых задач”.

Архитектура

AsyncTask использует для хранение информации о состоянии/результате задачи базу данных и модель TaskMeta.

AsyncTaskDatabaseBackend (наследник DatabaseBackend из django-celery) - это менеджер хранения задач.

Для дополнительных атрибутов задачи можно создать собственную модель, отнаследовавшись от модели TaskMeta, и указав собственную модель как модель хранения задачи в менеджере.

Функционал передачи дополнительных атрибутов задачи в менеджер хранения обеспечивает класс AsyncTask (наследник Task из celery).

Для упрощения объявления таких задач создан декоратор async_task.

Дополнительные атрибуты устанавливаются через парметр ‘fields’. Он может указываться в декораторе async_task, вызове задачи и методе обновления состояния задачи.

Задача сохраняется в базу данных, при любых изменениях состояния, в том числе и при постановке в очередь (в отличие от функционала django-celery).

Также, при изменении состояния, менеджер вызывает дополнительный метод notify, который может обеспечить отправку уведомления о состоянии задачи клиенту (например, через websocket).

Пример использования

Настройка и описание

Опишем модель с дополнительными атрибутами задачи:

class AsyncTaskMeta(TaskMeta):
    owner = models.ForeignKey(User, verbose_name=u'Владелец',
                              null=True, blank=True, on_delete=CASCADE)
    provider = models.ForeignKey(Provider, verbose_name=u'Учреждение',
                                 on_delete=CASCADE, null=True, blank=True)
    name = models.CharField(u'Наименование', max_length=200,
                            null=True, blank=True)
    current = models.IntegerField(u"Текущий пункт", default=0)
    total = models.IntegerField(u"Всего пунктов", default=100)
    started = models.DateTimeField(u'Время начала', auto_now_add=True)
    finished = models.DateTimeField(u'Время завершения', auto_now=True)
    task_type = models.CharField(verbose_name=u'Тип задачи', max_length=20,
                                 choices=AsyncTaskType.get_choices(),
                                 default=AsyncTaskType.DEFAULT)

    objects = AsyncTaskManager()

    class Meta:
        db_table = 'async_task'

Примечание

Важно наследовать модель от TaskMeta и указать AsyncTaskManager в качестве менеджера запросов

Опишем менеджера хранения и укажем ему модель для хранения:

class AsyncTaskBackend(AsyncTaskDatabaseBackend):
    TaskModel = AsyncTaskMeta

Укажем celery использовать новый менеджер хранения в settings.py:

CELERY_RESULT_BACKEND = 'project.async.AsyncTaskBackend'
import djcelery
djcelery.setup_loader()

Опишем фоновую задачу через декоратор. Внутри задачи будем обновлять состояние задачи и ее параметры:

@async_task(fields={'name': u'Долгое сложение', 'task_type': AsyncTaskType.LONG})
def test_task(x, y, **kwargs):
    sleep(5)
    test_task.update_state(meta={'fields': {'current': 33}})
    sleep(5)
    // можно указать собственное состояние
    test_task.update_state(state='PROGRESS', meta={'fields': {'current': 67}})
    sleep(5)
    res = x + y
    if res == 0:
        raise Exception()
    test_task.update_state(meta={'fields': {'current': 100}})
    return res

Примечание

Важно указать в параметрах задачи **kwargs для передачи дополнительных параметров

Для прикладных случаев, можно создать упрощающие функции:

def update_task(task, current=0, total=None, state=None, result=None):
    update_dict = {
        'current': current,
    }

    if state is not None:
        update_dict['status'] = state

    if result is not None:
        update_dict['result'] = result

    task.update_state(meta={'fields': update_dict})

update_task(test_task, 33)
update_task(test_task, 67, state='PROGRESS')

Работа с задачами

Вызов задачи с параметрами:

test_task.delay(1,2, fields={'owner': request.user, 'provider': provider_id})

Для прикладных случаев, можно создать функцию упрощающую вызов:

def run_task(task, params, user, provider):
    return task.delay(*params, fields={'owner': user, 'provider': provider})

run_task(test_task, (1,2), user, provider_id)

Получение списка задач по учреждению:

# за исключением отмененных
tasks = AsyncTaskMeta.objects.filter(provider=provider_id).exclude(status='REVOKED')

Получение задачи по task_id и ее отмена:

task = AsyncTaskMeta.objects.get(task_id=task_id)
# остановка выполенения задачи в celery
celery.control.revoke(task_id=task_id, terminate=True)
# удаление из хранилища
task.delete()

Уведомление о состоянии задачи

Для уведомления об изменении состоянии задачи достаточно указать в менеджере хранения объект notifier, у которого будет вызван метод notify и передана задача:

class WebsocketSender(object):
    u""" Синглтон клиента-отправителя """
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, 'sender'):
            from pushme.mq import get_sender
            cls.sender = get_sender(
                settings.WEBSOCKET_BACKEND,
                (settings.WEBSOCKET_QUEUE_HOST, settings.WEBSOCKET_QUEUE_PORT)
            )
        return cls.sender

    @classmethod
    def notify(cls, task):
        if settings.USE_WEBSOCKET:
            data = task.serialize() or {}
            json_data = simplejson.dumps(data)
            cls().send(
                data=json_data,
                uid=task.owner_id,
                topic='task_state_'+str(task.provider_id)
            )


class AsyncTaskBackend(AsyncTaskDatabaseBackend):
    TaskModel = AsyncTaskMeta
    notifier = WebsocketSender