Когда в проекте стоит задача реализовать функциональность, которая будет выполняться через определенное время или по расписанию, на помощь приходит такой замечательный инструмент, как Celery.
Celery — это распределенная асинхронная очередь задач, выполняющаяся в реальном времени, а также поддерживающая планирование задач. Ознакомиться с теоретическими аспектами можно по ссылке. В данной статье мы рассмотрим периодические задачи. Несмотря на то, что периодические задачи можно создавать через административную страницу в Django admin, мы рассмотрим их создание в коде, а также остановку созданных задач по условию.
Если вы хотите повторить пример из статьи, то у вас уже должен быть настроен Django-проект. Однако если вам нужна помощь по настройке проекта, почитайте эту статью.
Самостоятельно Celery не умеет реализовывать периодические задачи, поэтому существует расширение django-celery-beat, которое выполняет роль планировщика задач для Celery. Как развернуть Сelery в приложении, описано в документации выше. В качестве посредника сообщений между нашим приложением и Сelery я использую Redis, развернутый в docker-контейнере.
Давайте установим django-celery-beat с помощью pip install django-celery-beat
После установки необходимо внести django-celery-beat в список установленных приложений в настройках приложения Django.
INSTALLED_APPS = (
...,
'django_celery_beat',
)
Это необходимо, так как данное расширение предоставляет новые модели, в которых будут храниться расписание и задачи, и приложение должно знать об этом, чтобы выполнить миграции и создать необходимые таблицы.
Запустим миграции. И в качестве последней настройки для полноценного использования django-celery-beat добавим в settings приложения: CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler', тем самым указав Celery использовать новый планировщик задач, только что установленный нами.
Чтобы убедиться в полной установке расширения, необходимо зайти на административный сайт приложения. Если расширение установлено верно, то вы увидите следующий список моделей, позволяющий управлять периодическими задачами:
Основная мысль, которую я бы хотел донести в статье, заключается в том, что периодические задачи являются обычными моделями, с которыми можно работать через Django ORM. Сейчас покажу на примере:
Допустим, вы создаете сервис, который делает заказы на стороннем сервисе через API. В ответе от этого сервиса вы получаете статус вашего заказа. Если мы получили статус от сервиса, то все отлично, дальше мы выполняем всю необходимую логику с этим заказом. Если же нет (например, пришел ответ '0') — тогда необходимо переотправлять запрос, пока статус не будет получен.
Мы не будем усложнять себе жизнь работой с API каких-либо сервисов. Сымитируем ответ от воображаемого сервиса с помощью custom command, которую самостоятельно напишем.
В папке приложения создадим папку «management», в ней папку «command» и в ней файл с кодом, например, «make_order.py».
И напишем код будущей консольной команды.
import json
from django.utils import timezone
from django.core.management.base import BaseCommand, CommandError
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from setups.models import Order
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('status', type=str)
parser.add_argument('order_id', nargs=1, type=int)
def handle(self, *args, **options):
status = options['status']
order = Order.objects.get(pk=options['order_id'][0])
if status == '0':
PeriodicTask.objects.create(
name='Repeat order {}'.format(options['order_id']),
task='repeat_order_make',
interval=IntervalSchedule.objects.get(every=10, period='seconds'),
args=json.dumps([options['order_id'][0]]),
start_time=timezone.now(),
)
else:
order.update(status=status)
order.refresh_from_db()
# Необходимая логика после удачного получения статуса
print('Статус вашего заказа -> {}'.format(order.status))
Данная команда ожидает 2 аргумента на вызове: первый — это статус, второй — id заказа. Если мы получаем статус '0' от нашего воображаемого сервиса, тогда нам необходимо повторно отправлять запросы на получение статуса. Тут-то нам и пригодятся периодические задачи. В данном примере мы создаем PeriodicTask из Django_celery_beat.models. В качестве аргументов мы передаем ей следующие параметры:
Давайте теперь посмотрим на код задачи 'repeat_order_make'. Создадим файл «task.py» в корне нашего приложения (как на скриншоте) и напишем следующий код.
from celery import shared_task
from django_celery_beat.models import PeriodicTask
from .models import Order
@shared_task(name="repeat_order_make")
def repeat_order_make(order_id):
order = Order.objects.get(pk=order_id)
if order.status != '0':
print('Статус получен!')
task = PeriodicTask.objects.get(name='Repeat order {}'.format(order_id))
task.enabled = False
task.save()
else:
# Необходимая логика при повторной отправке заказа
Еееееееееееепеепппппп .print('Я должна повторно оформлять заказ каждые 10 секунд')
В коде учитывается, что если заказ уже получил статус, то нет необходимости больше выполнять написанную нами периодическую задачу. Для того, чтобы остановить последующее выполнение задачи, мы получаем экземпляр задачи с помощью метода get(), передавая в качестве аргумента имя, которое мы дали во время создания задачи. Затем мы переводим поле enabled в состояние False, что остановит выполнение задачи в дальнейшем, и обязательно сохраняем экземпляр задачи.
Давайте теперь проверим результаты наших трудов. Запустим написанную нами команду с помощью manage.py make_order 0 1. То есть мы делаем заказ №1 и как будто получаем ответ со статусом '0'. Это должно создать новую периодическую задачу. Давайте глянем в celery_beat командой celery -A <имя вашего приложения> beat -l INFO.
И увидим, как началось периодическое выполнение задачи repeat_order_make, которую мы описали выше. А также заглянем в Celery командой celery -A <имя вашего приложения> worker -l INFO (я запускал каждую команду в отдельном окне терминала).
Видно, что Celery получает, и посылаем задачи для исполнения.
Теперь изменим статус нашего заказа, снова запустив команду manage.py make_order, но уже с параметрами «Принят, 1». То есть manage.py make_order Принят 1. После чего можно посмотреть в терминал с celery-beat и увидеть сообщение о том, что было изменено расписание выполнения задач, после чего постоянный вызов repeat_order_make прекратится.
Таким образом, мы научились создавать периодические задачи прямо в коде программы, без использования административного сайта проекта, и также научились их останавливать. Я постарался привести пример из личного опыта, однако максимально упростил его, чтобы выжать максимум информации по конкретному случаю использования периодических задач. Если вы хотите больше узнать о том, как пользоваться периодическими задачами или посмотреть другие кейсы использования этого инструмента, я рекомендую ознакомиться со следующими материалами: раз и два.