Celery

Celery

1. What is Clelery

Celery is a simple, flexible and reliable method that handles a large number of Message Distributed System

Asynchronous task queue focusing on real-time processing

It also supports task scheduling

Celery architecture

20150314100608_187

Celery’s architecture consists of three parts, the message It consists of middleware (message broker), task execution unit (worker) and task execution result store (task result store).

Message middleware

Celery itself does not provide message services, but it can be easily integrated with message middleware provided by third parties. Including RabbitMQ, Redis, etc.

Task Execution Unit

Worker is the task execution unit provided by Celery, and workers run concurrently in distributed systems Node.

Task result storage

Task result store is used to store the results of tasks performed by Workers. Celery supports storing task results in different ways, including AMQP, redis etc.

Version Support Situation

Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4 , 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

If you're running an older version of Python, you need to be running an older version of Celery:

Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.

Celery is a project with minimal funding, so we don't support Microsoft Windows. Please don't open any issues related to that platform.

< h2 id="Usage scenario">2.Usage scenario

Asynchronous tasks: submit time-consuming operation tasks to Celery for asynchronous execution, such as sending SMS/email, message push, audio and video processing, etc.

p>

Timed tasks: perform certain things regularly, such as daily statistics

3. Celery installation and configuration

pip install celery< /p>

Message middleware: Rab bitMQ/Redis

app=Celery(‘task name’,backend=’xxx’,broker=’xxx’)

4.Celery execution Asynchronous task

Basic usage

Create project celerytest

Create py file: celery_app_task.py

import celeryimport time# broker='redis://127.0.0.1:6379/2' without password backend='redis://:[emailprotected]:6379/1'broker='redis://: [email protected]:6379/2'cel=celery.Celery('test',backend=backend,broker=broker)@cel.taskdef add(x,y): return x+y

Create py file: add_task.py, add task

from celery_app_task import addresult = add.delay(4,5)print(result.id)

Create py file : Run.py, execute the task, or use the command to execute: celery worker -A celery_app_task -l info

Note: under windows: celery worker -A celery_app_task -l info -P eventlet

from celery_app_task import celif __name__ =='__main__': cel.worker_main() # cel.worker_main(argv=['--loglevel=info')

Create py file: result .py, view task execution results

from celery.result import AsyncResultfrom celery_app_task import celasync = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)if async.successful(): result = async.get() print(result) # result.forget() # delete the result elif async.failed(): print('Execution failed') elif async.status =='PENDING': print('The task is being executed while waiting') elif async.status =='RETRY': print('After the task is abnormal Retrying') elif async.status =='STARTED': print('Task has been executed')

Execute add_task.py, add task, and get task ID

Execute run.py, or execute the command: celery worker -A celery_app_task -l info

Execute result.py, check the task status and get the result

Multitasking structure

pro_cel ├── celery_task# celery related folders│ ├── celery.py # celery connection and configuration related files, which must be named │ └── tasks1. py # All task functions│ └── tasks2.py # All task functions ├── check_result.py # Check the result └── send_task.py # Trigger task

celery.py

from celery import Celerycel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # Contains the following two task files, go to the corresponding py file to find the task, and classify multiple tasks include=['celery_task.tasks1','celery_task.tasks2' ])# Time zone cel.conf.timezone ='Asia/Shanghai'# Whether to use UTCcel.conf.enable_utc = False

tasks1.py< /p>

import timefrom celery_task.celery import [email protected] test_celery(res): time.sleep(5) return "test_celery task result:%s"%res

tasks2.py

import timefrom celery_task.celery import [email protected] test_celery2(res): time.sleep(5) return "test_celery2 task result:%s"%res< /pre> 

check_result.py

from celery.result import AsyncResultfrom celery_task.celery import celasync = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app =cel)if async.successful(): result = async.get() print(result) # result.forget() # Delete the result, the execution is complete, the result will not be deleted automatically # async.revoke(terminate=True) # No matter what time it is, it must be terminated # async.revoke(terminate=False) # If the task has not been executed yet, then it can be terminated. elif async.failed(): print('Execution failed') elif async.status =='PENDING': print('The task is being executed while waiting') elif async.status =='RETRY': print('After the task is abnormal Retrying') elif async.status =='STARTED': print('Task has been executed')

send_task.py

from celery_task. tasks1 import test_celeryfrom celery_task.tasks2 import test_celery2# immediately tell celery to execute the test_celery task, and pass in a parameter result = test_celery.delay('first execution') print(result.id) result = test_celery2.delay(' The execution of the two')print(result.id)

Add task (execute send_task.py), start work: celery worker -A celery_task -l info -P eventlet, check the task execution result (execute check_result .py)

5. Celery executes timing tasks

set time for celery to execute a task Task

add_task.py

from celery_app_task import addfrom datetime import datetime# Method One# v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1)# v2 = datetime.utcfromtimestamp(v1.timestamp())# print(v2)# result = add.apply_async(args=[1, 3], eta=v2)# print(result.id)# Method two ctime = datetime.now()# Use utc time utc_cti by default me = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedeltatime_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay# Use apply_async and set the time result = add.apply_async(args=[4, 3], eta =task_time)print(result.id)

Timed task similar to contab

The celery.py in the multitasking structure is modified as follows

p>

from datetime import timedeltafrom celery import Celeryfrom celery.schedules import crontabcel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis: //127.0.0.1:6379/2', include=['celery_task.tasks1','celery_task.tasks2',])cel.conf.timezone ='Asia/Shanghai'cel.conf.enable_utc = Falsecel.conf.beat_schedule = {# Name arbitrarily named'add-every-10-seconds': {# Execute the test_celery function under tasks1'task':'celery_task.tasks1.test_celery', # Execute once every 2 seconds #'schedule': 1.0, #'schedule': crontab(minute="*/1"),'schedule': timedelta(seconds=2), # Passing parameters'args': ('test',) }, #'add-every-12- seconds': {# 'task':'celery_task.tasks1.test_celery', # execute every April 11th, 8:42 #'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # ' schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), #'args': (16, 16) # },}

Start a beat: celery beat- A celery_task -l info

Start work execution: celery worker -A celery_task -l info -P eventlet

6. Celery in Django

h2>

Installation package

celery==3.1.25django-celery==3.1.20

Create celeryconfig.py in the project directory

import djcelerydjcelery.setup_loader()CELERY_IMPORTS=('app01.tasks',)#Some situations can prevent deadlock CELERYD_FORCE_EXECV=True# Set the number of concurrent workers CELERYD_CONCURRENCY=4#Allow retry CELERY_ACKS_LATE=True # Each worker can execute up to 100 tasks to be destroyed, which can prevent memory leaks CELERYD_MAX_TASKS_PER_CHILD=100# Timeout CELERYD_TASK_TIME_LIMIT=12*30

Create tasks.py in the app01 directory

from celery import task@taskdef add(a,b): with open('a.text','a', encoding='utf-8') as f: f.write('a') print(a+b)

View function views.py

from django.shortcuts import render,HttpResponsefrom app01.tasks import addfrom datetime import datetimedef test(request) : # result=add.delay(2,3) ctime = datetime.now() # default utc time utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse('ok')

settings.py

INSTALLED_APPS = [...'djcelery','app01']...from djagocele import celeryconfigBROKER_BACKEND='redis'BOOKER_URL='redis://127.0.0.1:6379/1'CELERY_RESULT_BACKEND='redis ://127.0.0.1:6379/2'

Leave a Comment

Your email address will not be published.