Celery
1. What is Clelery
Celery is a simple, flexible and reliable method that handles a large number of Message Distributed System
Asynchronous task queue focusing on real-time processing
It also supports task scheduling
Celery architecture
Celery’s architecture consists of three parts, the message It consists of middleware (message broker), task execution unit (worker) and task execution result store (task result store).
Message middleware
Celery itself does not provide message services, but it can be easily integrated with message middleware provided by third parties. Including RabbitMQ, Redis, etc.
Task Execution Unit
Worker is the task execution unit provided by Celery, and workers run concurrently in distributed systems Node.
Task result storage
Task result store is used to store the results of tasks performed by Workers. Celery supports storing task results in different ways, including AMQP, redis etc.
Version Support Situation
Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4 , 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you're running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don't support Microsoft Windows. Please don't open any issues related to that platform.
< h2 id="Usage scenario">2.Usage scenario
Asynchronous tasks: submit time-consuming operation tasks to Celery for asynchronous execution, such as sending SMS/email, message push, audio and video processing, etc.
p>
Timed tasks: perform certain things regularly, such as daily statistics
3. Celery installation and configuration
pip install celery< /p>
Message middleware: Rab bitMQ/Redis
app=Celery(‘task name’,backend=’xxx’,broker=’xxx’)
4.Celery execution Asynchronous task
Basic usage
Create project celerytest
Create py file: celery_app_task.py
import celeryimport time# broker='redis://127.0.0.1:6379/2' without password backend='redis://:[emailprotected]:6379/1'broker='redis://: [email protected]:6379/2'cel=celery.Celery('test',backend=backend,broker=broker)@cel.taskdef add(x,y): return x+y
Create py file: add_task.py, add task
from celery_app_task import addresult = add.delay(4,5)print(result.id)
Create py file : Run.py, execute the task, or use the command to execute: celery worker -A celery_app_task -l info
Note: under windows: celery worker -A celery_app_task -l info -P eventlet
from celery_app_task import celif __name__ =='__main__': cel.worker_main() # cel.worker_main(argv=['--loglevel=info')
Create py file: result .py, view task execution results
from celery.result import AsyncResultfrom celery_app_task import celasync = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)if async.successful(): result = async.get() print(result) # result.forget() # delete the result elif async.failed(): print('Execution failed') elif async.status =='PENDING': print('The task is being executed while waiting') elif async.status =='RETRY': print('After the task is abnormal Retrying') elif async.status =='STARTED': print('Task has been executed')
Execute add_task.py, add task, and get task ID
Execute run.py, or execute the command: celery worker -A celery_app_task -l info
Execute result.py, check the task status and get the result
Multitasking structure
pro_cel ├── celery_task# celery related folders│ ├── celery.py # celery connection and configuration related files, which must be named │ └── tasks1. py # All task functions│ └── tasks2.py # All task functions ├── check_result.py # Check the result └── send_task.py # Trigger task
celery.py
from celery import Celerycel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # Contains the following two task files, go to the corresponding py file to find the task, and classify multiple tasks include=['celery_task.tasks1','celery_task.tasks2' ])# Time zone cel.conf.timezone ='Asia/Shanghai'# Whether to use UTCcel.conf.enable_utc = False
tasks1.py< /p>
import timefrom celery_task.celery import [email protected] test_celery(res): time.sleep(5) return "test_celery task result:%s"%res
tasks2.py
import timefrom celery_task.celery import [email protected] test_celery2(res): time.sleep(5) return "test_celery2 task result:%s"%res< /pre>check_result.py
from celery.result import AsyncResultfrom celery_task.celery import celasync = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app =cel)if async.successful(): result = async.get() print(result) # result.forget() # Delete the result, the execution is complete, the result will not be deleted automatically # async.revoke(terminate=True) # No matter what time it is, it must be terminated # async.revoke(terminate=False) # If the task has not been executed yet, then it can be terminated. elif async.failed(): print('Execution failed') elif async.status =='PENDING': print('The task is being executed while waiting') elif async.status =='RETRY': print('After the task is abnormal Retrying') elif async.status =='STARTED': print('Task has been executed')send_task.py
from celery_task. tasks1 import test_celeryfrom celery_task.tasks2 import test_celery2# immediately tell celery to execute the test_celery task, and pass in a parameter result = test_celery.delay('first execution') print(result.id) result = test_celery2.delay(' The execution of the two')print(result.id)Add task (execute send_task.py), start work: celery worker -A celery_task -l info -P eventlet, check the task execution result (execute check_result .py)
5. Celery executes timing tasks
set time for celery to execute a task Task
add_task.py
from celery_app_task import addfrom datetime import datetime# Method One# v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1)# v2 = datetime.utcfromtimestamp(v1.timestamp())# print(v2)# result = add.apply_async(args=[1, 3], eta=v2)# print(result.id)# Method two ctime = datetime.now()# Use utc time utc_cti by default me = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedeltatime_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay# Use apply_async and set the time result = add.apply_async(args=[4, 3], eta =task_time)print(result.id)Timed task similar to contab
The celery.py in the multitasking structure is modified as follows
p>
from datetime import timedeltafrom celery import Celeryfrom celery.schedules import crontabcel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis: //127.0.0.1:6379/2', include=['celery_task.tasks1','celery_task.tasks2',])cel.conf.timezone ='Asia/Shanghai'cel.conf.enable_utc = Falsecel.conf.beat_schedule = {# Name arbitrarily named'add-every-10-seconds': {# Execute the test_celery function under tasks1'task':'celery_task.tasks1.test_celery', # Execute once every 2 seconds #'schedule': 1.0, #'schedule': crontab(minute="*/1"),'schedule': timedelta(seconds=2), # Passing parameters'args': ('test',) }, #'add-every-12- seconds': {# 'task':'celery_task.tasks1.test_celery', # execute every April 11th, 8:42 #'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # ' schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), #'args': (16, 16) # },}Start a beat: celery beat- A celery_task -l info
Start work execution: celery worker -A celery_task -l info -P eventlet
6. Celery in Django
h2>
Installation package
celery==3.1.25django-celery==3.1.20Create celeryconfig.py in the project directory
import djcelerydjcelery.setup_loader()CELERY_IMPORTS=('app01.tasks',)#Some situations can prevent deadlock CELERYD_FORCE_EXECV=True# Set the number of concurrent workers CELERYD_CONCURRENCY=4#Allow retry CELERY_ACKS_LATE=True # Each worker can execute up to 100 tasks to be destroyed, which can prevent memory leaks CELERYD_MAX_TASKS_PER_CHILD=100# Timeout CELERYD_TASK_TIME_LIMIT=12*30Create tasks.py in the app01 directory
from celery import task@taskdef add(a,b): with open('a.text','a', encoding='utf-8') as f: f.write('a') print(a+b)View function views.py
from django.shortcuts import render,HttpResponsefrom app01.tasks import addfrom datetime import datetimedef test(request) : # result=add.delay(2,3) ctime = datetime.now() # default utc time utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse('ok')settings.py
INSTALLED_APPS = [...'djcelery','app01']...from djagocele import celeryconfigBROKER_BACKEND='redis'BOOKER_URL='redis://127.0.0.1:6379/1'CELERY_RESULT_BACKEND='redis ://127.0.0.1:6379/2'