Использование django-celery-beat для создания периодических задач в Django-проектах
Когда в проекте стоит задача реализовать функциональность, которая будет выполняться через определенное время или по расписанию, на помощь приходит такой замечательный инструмент, как Celery.
Celery — это распределенная асинхронная очередь задач, выполняющаяся в реальном времени, а также поддерживающая планирование задач. Ознакомиться с теоретическими аспектами можно по ссылке. В данной статье мы рассмотрим периодические задачи. Несмотря на то, что периодические задачи можно создавать через административную страницу в Django admin, мы рассмотрим их создание в коде, а также остановку созданных задач по условию.
Если вы хотите повторить пример из статьи, то у вас уже должен быть настроен Django-проект. Однако если вам нужна помощь по настройке проекта, почитайте эту статью.
В основном Celery в Django-приложениях используется, когда необходимо:
- асинхронно выполнить какую-либо задачу с помощью Celery worker;
- выполнить задачу в определенное время;
- постоянно выполнять задачу через определенный промежуток времени.
Самостоятельно Celery не умеет реализовывать периодические задачи, поэтому существует расширение django-celery-beat, которое выполняет роль планировщика задач для Celery. Как развернуть Сelery в приложении, описано в документации выше. В качестве посредника сообщений между нашим приложением и Сelery я использую Redis, развернутый в docker-контейнере.
Давайте установим django-celery-beat с помощью pip install django-celery-beat
После установки необходимо внести django-celery-beat в список установленных приложений в настройках приложения Django.
INSTALLED_APPS = (
...,
'django_celery_beat',
)Это необходимо, так как данное расширение предоставляет новые модели, в которых будут храниться расписание и задачи, и приложение должно знать об этом, чтобы выполнить миграции и создать необходимые таблицы.
Запустим миграции. И в качестве последней настройки для полноценного использования django-celery-beat добавим в settings приложения: CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler', тем самым указав Celery использовать новый планировщик задач, только что установленный нами.
Чтобы убедиться в полной установке расширения, необходимо зайти на административный сайт приложения. Если расширение установлено верно, то вы увидите следующий список моделей, позволяющий управлять периодическими задачами:

Динамическое создание периодических задач
Основная мысль, которую я бы хотел донести в статье, заключается в том, что периодические задачи являются обычными моделями, с которыми можно работать через Django ORM. Сейчас покажу на примере:
Допустим, вы создаете сервис, который делает заказы на стороннем сервисе через API. В ответе от этого сервиса вы получаете статус вашего заказа. Если мы получили статус от сервиса, то все отлично, дальше мы выполняем всю необходимую логику с этим заказом. Если же нет (например, пришел ответ '0') — тогда необходимо переотправлять запрос, пока статус не будет получен.
Мы не будем усложнять себе жизнь работой с API каких-либо сервисов. Сымитируем ответ от воображаемого сервиса с помощью custom command, которую самостоятельно напишем.
В папке приложения создадим папку «management», в ней папку «command» и в ней файл с кодом, например, «make_order.py».

И напишем код будущей консольной команды.
import json
from django.utils import timezone
from django.core.management.base import BaseCommand, CommandError
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from setups.models import Order
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('status', type=str)
parser.add_argument('order_id', nargs=1, type=int)
def handle(self, *args, **options):
status = options['status']
order = Order.objects.get(pk=options['order_id'][0])
if status == '0':
PeriodicTask.objects.create(
name='Repeat order {}'.format(options['order_id']),
task='repeat_order_make',
interval=IntervalSchedule.objects.get(every=10, period='seconds'),
args=json.dumps([options['order_id'][0]])
start_time=timezone.now(),
)
else:
order.update(status=status)
order.refresh_from_db()
# Необходимая логика после удачного получения статуса
print('Статус вашего заказа -> {}'.format(order.status))Данная команда ожидает 2 аргумента на вызове: первый — это статус, второй — id заказа. Если мы получаем статус '0' от нашего воображаемого сервиса, тогда нам необходимо повторно отправлять запросы на получение статуса. Тут-то нам и пригодятся периодические задачи. В данном примере мы создаем PeriodicTask из Django_celery_beat.models. В качестве аргументов мы передаем ей следующие параметры:
- Имя создаваемой задачи: name='Repeat order .{}'.format(options['order_id']), в дальнейшем с ее помощью мы будем останавливать задачи, если нам это необходимо.
- Задача, которая будет периодически выполняться через определенный промежуток времени. В нашем случае это: task='repeat_order_make'.
- Интервал, через который мы хотим, чтобы задача выполнялась: interval=IntervalSchedule.objects.get(every=10, period='seconds'). IntervalSchedule также является моделью из django_celery_beat.models, поэтому мы можем пользоваться функциональностью, предоставляемой Django ORM.
- Аргументы, которые будут передаваться в указанную функцию. В нашем случае задаче 'repeat_order_make' будет передан аргумент args=json.dumps([options['order_id'][0]]), то есть options['order_id'][0].
- И время начала работы периодической задачи: start_time=timezone.now().
Давайте теперь посмотрим на код задачи 'repeat_order_make'. Создадим файл «task.py» в корне нашего приложения (как на скриншоте) и напишем следующий код.
from celery import shared_task
from django_celery_beat.models import PeriodicTask
from .models import Order
@shared_task(name="repeat_order_make")
def repeat_order_make(order_id):
order = Order.objects.get(pk=order_id)
if order.status != '0':
print('Статус получен!')
task = PeriodicTask.objects.get(name='Repeat order {}'.format(order_id))
task.enabled = False
task.save()
else:
# Необходимая логика при повторной отправке заказа
Еееееееееееепеепппппп .print('Я должна повторно оформлять заказ каждые 10 секунд')В коде учитывается, что если заказ уже получил статус, то нет необходимости больше выполнять написанную нами периодическую задачу. Для того, чтобы остановить последующее выполнение задачи, мы получаем экземпляр задачи с помощью метода get(), передавая в качестве аргумента имя, которое мы дали во время создания задачи. Затем мы переводим поле enabled в состояние False, что остановит выполнение задачи в дальнейшем, и обязательно сохраняем экземпляр задачи.
Давайте теперь проверим результаты наших трудов. Запустим написанную нами команду с помощью manage.py make_order 0 1. То есть мы делаем заказ №1 и как будто получаем ответ со статусом '0'. Это должно создать новую периодическую задачу. Давайте глянем в celery_beat командой celery -A <имя вашего приложения> beat -l INFO.

И увидим, как началось периодическое выполнение задачи repeat_order_make, которую мы описали выше. А также заглянем в Celery командой celery -A <имя вашего приложения> worker -l INFO (я запускал каждую команду в отдельном окне терминала).

Видно, что Celery получает, и посылаем задачи для исполнения.
Теперь изменим статус нашего заказа, снова запустив команду manage.py make_order, но уже с параметрами «Принят, 1». То есть manage.py make_order Принят 1. После чего можно посмотреть в терминал с celery-beat и увидеть сообщение о том, что было изменено расписание выполнения задач, после чего постоянный вызов repeat_order_make прекратится.

Таким образом, мы научились создавать периодические задачи прямо в коде программы, без использования административного сайта проекта, и также научились их останавливать. Я постарался привести пример из личного опыта, однако максимально упростил его, чтобы выжать максимум информации по конкретному случаю использования периодических задач. Если вы хотите больше узнать о том, как пользоваться периодическими задачами или посмотреть другие кейсы использования этого инструмента, я рекомендую ознакомиться со следующими материалами: раз и два.