Celery任务队列基本笔记
场景
Web服务中的某些请求可能耗时颇长,并且我们难以控制响应时间时间,比如我们调用了第三方的服务(如电子邮件服务、文档解析服务等),我们无法加快第三方服务的返回同时也不能简单地跳过/忽略这些调用。
这种需要执行耗时不确定的操作的场景,我们可以将任务添加到消息/任务队列中,并立即响应用户,由另外的“消费者”进程/线程去处理耗时任务。
Celery
特性
Python开发社区中最受欢迎的任务队列框架为Celery,其具有很多优点:
-
高效率:官方介绍,Celery单进程可在一分钟内处理百万级的任务
-
分布式执行:Celery支持将任务分发到多个计算机上,以实现分布式执行,从而提高处理能力和性能
-
生产者-消费者模型:Celery采用典型的生产者和消费者模型。生产者发送任务到消息队列,消费者从消息队列中取任务进行执行。
-
异步任务处理:通过将任务放入队列中,Celery可以实现任务的异步执行,避免阻塞主应用程序的进程
-
支持计划执行任务(替代crontab)
架构
-
消息中间件(Broker):用于存储和传递任务消息。常用的中间件有RabbitMQ、Redis等。
-
任务执行单元(Worker):负责从消息队列中获取任务并执行它们。通常使用多线程技术如Eventlet、gevent等来提高并发性能。
-
任务结果存储(Task Result Store):用于存储任务执行的结果,确保任务结果可以被查询和回调。可选Redis/RebbitMQ/MySQL/……
使用前置条件
-
选择消息中间件:Redis/RebbitMQ/…。celery进程订阅消息中间件后即可消费任务队列中的任务。
-
选择结果中间件,用于保存任务执行结果
Django中使用Celery的简单方法
-
安装celery库
-
在Django的配置文件中配置Celery相关参数(如使用的消息中间件、结果存储中间件、用户密码、工作线程数、任务脚本搜索路径等)
-
编写Celery应用初始化脚本(读取配置、实例化Celery app对象)
-
启动Celery:
celery -A CELERY_APP worker -L INFO
注:若是windows环境,启动时需要加参数-P eventlet
简单示例如下:
celery配置:
# django settings.py
#broker(消息中间件来接收和发送任务消息)
CELERY_BROKER_URL = 'redis://localhost:6379/1'
#backend(存储worker执行的结果)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
#设置时间参照,不设置默认使用的UTC时间
CELERY_TIMEZONE = 'Asia/Shanghai'
#指定任务的序列化
CELERY_TASK_SERIALIZER='json'
#指定执行结果的序列化
CELERY_RESULT_SERIALIZER='json'
CELERY_WORKER_CONCURRENCY = 2
celery初始化脚本
import os
import celery
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')
django.setup()
app = celery.Celery('demo', include=[
'service.tasks'
])
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()