Celery介绍

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,支持任务调度 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成,可以配合flower查看任务执行状态等信息。

  • 消息中间件 Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
  • 任务执行单元 Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
  • 任务结果存储 Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

  • 安装redis python接口 sudo pip install redis

  • 安装celery sudo pip install celery 我用redis做结果存储和任务队列储存,创建并发送一个异步任务

from celery import Celery
#redis没有设置账户密码
app = Celery('tasks', broker='redis://localhost:6379/3',backend='redis://localhost:6379/6')

@app.task
def add(x, y):
    return x + y
if __name__ == '__main__':
result = add(10, 5)

# broker:任务队列的中间人
# backend:任务执行结果的存储

app.task装饰add函数成一个Task实例,add函数将task实例序列化后,将任务发送到redis; 该过程创建一个名字为celery的exchange交换机,类型为direct(直连交换机);创建一个名为celery的queue,队列和交换机使用路由键celery绑定,打开flower,可以看到有一条消息已经在celery队列中,当有多个装饰器的时候,app.task一定要在最外层
运行后在redis中会存在两个键 celery和_kombu.binding.celery, _kombu.binding.celery表示有一名为 celery 的任务队列(Celery 默认),而键celery为默认队列中的任务列表,使用list类型,可以看看添加进去的任务数据。
开启worker执行任务在项目目录下执行命令:

celery -A app.celery_tasks.celery worker -Q queue --loglevel=info
  • -A参数指定创建的celery对象的位置,该app.celery_tasks.celery指的是app包下面的celery_tasks.py模块的celery实例,注意一定是初始化后的实例,后面加worker表示该实例就是任务执行者;
  • -Q参数指的是该worker接收指定的队列的任务,这是为了当多个队列有不同的任务时可以独立;如果不设会接收所有的队列的任务;
  • -l参数指定worker输出的日志级别
    任务执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对:celery-task-meta-16xe4252-c1ln-5m18-f2j6-82ds66618189r:data该键值对的失效时间默认为24小时。分析序列化的消息add将Task实例序列化后发送到redis 我们可以看到body里面有我们需要执行的函数的一切信息,celery的worker接收到消息后就会反序列化body数据,执行相应的方法。

celery配置

celery的性能和许多因素有关,比如序列化的方式,连接redis的方式,多进程、单线程等等,我们可以指定配置;

基本配置项

  • CELERY_DEFAULT_QUEUE:默认队列
  • BROKER_URL : 代理人即redis的网址
  • CELERY_RESULT_BACKEND:结果存储地址
  • CELERY_TASK_SERIALIZER:任务序列化方式
  • CELERY_RESULT_SERIALIZER:任务执行结果序列化方式
  • CELERY_TASK_RESULT_EXPIRES:任务过期时间
  • CELERY_ACCEPT_CONTENT:指定任务接受的内容序列化类型(序列化),一个列表; 加载配置
# main.py
from celery import Celery
import celeryconfig
app = Celery(__name__, include=["task"])
# 引入配置文件
app.config_from_object(celeryconfig)

if __name__ == '__main__':
    result = add(10, 5)

# task.py
from main import app
@app.task
def add(x, y):
    return x + y  

# celeryconfig.py

BROKER_URL =  'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/6'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任务过期时间,单位秒
CELERY_ACCEPT_CONTENT = ["json"]            # 指定任务接受的内容序列化的类型.

也可以直接加载配置

from celery import Celery
import celeryconfig
app = Celery(__name__, include=["task"])
app.conf.update(
        task_serializer='json',
        accept_content=['json'],
        result_serializer='json',
        timezone='Europe/Oslo',
        enable_utc=True,
    )

此外还有两个方法可以加载配置,但开发不会直接调用:

app.config_from_envvar() # 从环境变量加载
app.config_from_cmdline() # 从命令行加载

一份比较常用的配置文件

# 注意celery4版本后CELERY_BROKER_URL改为BROKER_URL
BROKER_URL = 'redis://host:6379/redis数据库编号'
# 指定结果的接受地址
CELERY_RESULT_BACKEND = 'redis://host:6379/redis数据库编号''redis://username:passwd@host:port/db'
# 指定任务序列化方式
CELERY_TASK_SERIALIZER = 'json' 
# 指定结果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
# 任务过期时间,celery任务执行结果的超时时间单位秒
CELERY_TASK_RESULT_EXPIRES = 60 * 15   
# 指定任务接受的序列化类型.
CELERY_ACCEPT_CONTENT = ["json"]   
# 任务发送完成是否需要确认这一项对性能有一点影响     
CELERY_ACKS_LATE = True  
# 压缩方案选择可以是zlib, bzip2默认是发送没有压缩的数据
CELERY_MESSAGE_COMPRESSION = 'zlib' 
# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 5  # 在5s内完成任务否则执行该任务的worker将被杀死任务移交给父进程
# celery worker的并发数默认是服务器的内核数目,也是命令行-c参数指定的数目
CELERYD_CONCURRENCY = 4 
# celery worker 每次去redis预取任务的数量
CELERYD_PREFETCH_MULTIPLIER = 4 
# 每个worker执行了多少任务就会死掉默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40 
# 设置默认的队列名称如果一个消息不符合其他的队列就会放在默认队列里面如果什么都不设置的话数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default" 
# 设置详细的队列
CELERY_QUEUES = {
    "default": { # 这是上面指定的默认队列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": { # 设置扇形交换机
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
}

本文链接:http://nix.pub/article/celery/