Celery介绍
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,支持任务调度 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成,可以配合flower查看任务执行状态等信息。
- 消息中间件 Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
- 任务执行单元 Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
-
任务结果存储 Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
-
安装redis python接口 sudo pip install redis
-
安装celery sudo pip install celery 我用redis做结果存储和任务队列储存,创建并发送一个异步任务
from celery import Celery
#redis没有设置账户密码
app = Celery('tasks', broker='redis://localhost:6379/3',backend='redis://localhost:6379/6')
@app.task
def add(x, y):
return x + y
if __name__ == '__main__':
result = add(10, 5)
# broker:任务队列的中间人
# backend:任务执行结果的存储
app.task装饰add函数成一个Task实例,add函数将task实例序列化后,将任务发送到redis;
该过程创建一个名字为celery的exchange交换机,类型为direct(直连交换机);创建一个名为celery的queue,队列和交换机使用路由键celery绑定,打开flower,可以看到有一条消息已经在celery队列中,当有多个装饰器的时候,app.task一定要在最外层
运行后在redis中会存在两个键 celery和_kombu.binding.celery, _kombu.binding.celery表示有一名为 celery 的任务队列(Celery 默认),而键celery为默认队列中的任务列表,使用list类型,可以看看添加进去的任务数据。
开启worker执行任务在项目目录下执行命令:
celery -A app.celery_tasks.celery worker -Q queue --loglevel=info
- -A参数指定创建的celery对象的位置,该app.celery_tasks.celery指的是app包下面的celery_tasks.py模块的celery实例,注意一定是初始化后的实例,后面加worker表示该实例就是任务执行者;
- -Q参数指的是该worker接收指定的队列的任务,这是为了当多个队列有不同的任务时可以独立;如果不设会接收所有的队列的任务;
- -l参数指定worker输出的日志级别
任务执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对:celery-task-meta-16xe4252-c1ln-5m18-f2j6-82ds66618189r:data该键值对的失效时间默认为24小时。分析序列化的消息add将Task实例序列化后发送到redis 我们可以看到body里面有我们需要执行的函数的一切信息,celery的worker接收到消息后就会反序列化body数据,执行相应的方法。
celery配置
celery的性能和许多因素有关,比如序列化的方式,连接redis的方式,多进程、单线程等等,我们可以指定配置;
基本配置项
- CELERY_DEFAULT_QUEUE:默认队列
- BROKER_URL : 代理人即redis的网址
- CELERY_RESULT_BACKEND:结果存储地址
- CELERY_TASK_SERIALIZER:任务序列化方式
- CELERY_RESULT_SERIALIZER:任务执行结果序列化方式
- CELERY_TASK_RESULT_EXPIRES:任务过期时间
- CELERY_ACCEPT_CONTENT:指定任务接受的内容序列化类型(序列化),一个列表; 加载配置
# main.py
from celery import Celery
import celeryconfig
app = Celery(__name__, include=["task"])
# 引入配置文件
app.config_from_object(celeryconfig)
if __name__ == '__main__':
result = add(10, 5)
# task.py
from main import app
@app.task
def add(x, y):
return x + y
# celeryconfig.py
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/6'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,单位秒
CELERY_ACCEPT_CONTENT = ["json"] # 指定任务接受的内容序列化的类型.
也可以直接加载配置
from celery import Celery
import celeryconfig
app = Celery(__name__, include=["task"])
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
此外还有两个方法可以加载配置,但开发不会直接调用:
app.config_from_envvar() # 从环境变量加载
app.config_from_cmdline() # 从命令行加载
一份比较常用的配置文件
# 注意,celery4版本后,CELERY_BROKER_URL改为BROKER_URL
BROKER_URL = 'redis://host:6379/redis数据库编号'
# 指定结果的接受地址
CELERY_RESULT_BACKEND = 'redis://host:6379/redis数据库编号'或'redis://username:passwd@host:port/db'
# 指定任务序列化方式
CELERY_TASK_SERIALIZER = 'json'
# 指定结果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
# 任务过期时间,celery任务执行结果的超时时间,单位秒
CELERY_TASK_RESULT_EXPIRES = 60 * 15
# 指定任务接受的序列化类型.
CELERY_ACCEPT_CONTENT = ["json"]
# 任务发送完成是否需要确认,这一项对性能有一点影响
CELERY_ACKS_LATE = True
# 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据
CELERY_MESSAGE_COMPRESSION = 'zlib'
# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 5 # 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
# celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
CELERYD_CONCURRENCY = 4
# celery worker 每次去redis预取任务的数量
CELERYD_PREFETCH_MULTIPLIER = 4
# 每个worker执行了多少任务就会死掉,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40
# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default"
# 设置详细的队列
CELERY_QUEUES = {
"default": { # 这是上面指定的默认队列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
"routing_key": "topic.#",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"task_eeg": { # 设置扇形交换机
"exchange": "tasks",
"exchange_type": "fanout",
"binding_key": "tasks",
},
}
本文链接:http://nix.pub/article/celery/