Celery 简介

​ 除了redis,还可以使用另外一个神器—Celery。Celery是一个异步任务的调度工具。

​ Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。

​ 在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。

​ 这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

​ Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。Celery用于生产系统每天处理数以百万计的任务。Celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

项目目录

image-20210623184033611

Config.py

1
2
3
4
5
6
7
8
9
10
# -*- coding: utf-8 -*-
"""
@author : Zeo
@software : PyCharm2021
@docs : https://docs.celeryproject.org/en/stable/userguide/configuration.html
"""

broker_url = 'redis://127.0.0.1:6379/2' # 消息中间件配置

#result_backend = 'redis://127.0.0.1:6379/16' # 默认不开启,后端用于存储任务结果(逻辑删除)

Main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# -*- coding: utf-8 -*-
"""
@author : Zeo
@software : PyCharm2021
@docs : https://docs.celeryproject.org/en/stable/
https://github.com/celery/celery
"""

import os

from celery import Celery

# 为celery程序设置Django配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'drf_admin.settings.dev')

# 创建celery应用 app = Celery('admin')
app = Celery('scan')

# 导入celery配置
app.config_from_object('celery_tasks.config')

# 加载celery任务模块
app.autodiscover_tasks(['celery_tasks.sms', 'drf_admin.apps.vulscan'])

# 启动Celery命令, (修改celery任务后必须重启celery)
# celery -A celery_tasks.main worker --loglevel=info

# win系统(celery4.0版本后不支持win, 需安装eventlet模块启动celery)
# celery -A celery_tasks.main worker --loglevel=info -P eventlet

建立task.py文件(里面是异步的主程序)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# author: Zeo
# python: 3.7
# datetime:2021/6/15 11:28 上午
# software: PyCharm

"""
文件说明:celery异步测试
"""
from random import random
from celery_tasks.main import app


@app.task(name='celerytest1')
def celerytest1(id):
print("celery_tasks"+str(id))

注意每次增加任务,要注册任务模块

1
2
3
Main.py
# 加载celery任务模块
app.autodiscover_tasks(['celery_tasks.sms', 'drf_admin.apps.vulscan'])

调用异步任务

1
2
3
4
5
6
7
8
9
@action(methods=['post'], detail=False)
def scan(self, request):
data=request.data
id = request.data.get('id')

#celerytest1 异步任务,调用delay。(id)是传参
celerytest1.delay(id)

return HttpResponse(content='one_scan', content_type=None, status=200)

命令行启动

1
celery -A celery_tasks.main  worker --loglevel=info