Celery任务队列基本笔记

场景

Web服务中的某些请求可能耗时颇长,并且我们难以控制响应时间时间,比如我们调用了第三方的服务(如电子邮件服务、文档解析服务等),我们无法加快第三方服务的返回同时也不能简单地跳过/忽略这些调用。

这种需要执行耗时不确定的操作的场景,我们可以将任务添加到消息/任务队列中,并立即响应用户,由另外的“消费者”进程/线程去处理耗时任务。

Celery

特性

Python开发社区中最受欢迎的任务队列框架为Celery,其具有很多优点:

  • 高效率:官方介绍,Celery单进程可在一分钟内处理百万级的任务

  • 分布式执行:Celery支持将任务分发到多个计算机上,以实现分布式执行,从而提高处理能力和性能

  • 生产者-消费者模型:Celery采用典型的生产者和消费者模型。生产者发送任务到消息队列,消费者从消息队列中取任务进行执行。

  • 异步任务处理:通过将任务放入队列中,Celery可以实现任务的异步执行,避免阻塞主应用程序的进程

  • 支持计划执行任务(替代crontab)

架构

  1. 消息中间件(Broker):用于存储和传递任务消息。常用的中间件有RabbitMQ、Redis等。

  2. 任务执行单元(Worker):负责从消息队列中获取任务并执行它们。通常使用多线程技术如Eventlet、gevent等来提高并发性能。

  3. 任务结果存储(Task Result Store):用于存储任务执行的结果,确保任务结果可以被查询和回调。可选Redis/RebbitMQ/MySQL/……

使用前置条件

  • 选择消息中间件:Redis/RebbitMQ/…。celery进程订阅消息中间件后即可消费任务队列中的任务。

  • 选择结果中间件,用于保存任务执行结果

Django中使用Celery的简单方法

  1. 安装celery库

  2. 在Django的配置文件中配置Celery相关参数(如使用的消息中间件、结果存储中间件、用户密码、工作线程数、任务脚本搜索路径等)

  3. 编写Celery应用初始化脚本(读取配置、实例化Celery app对象)

  4. 启动Celery:celery -A CELERY_APP worker -L INFO

    注:若是windows环境,启动时需要加参数-P eventlet

简单示例如下:

celery配置:

# django settings.py
#broker(消息中间件来接收和发送任务消息)
CELERY_BROKER_URL = 'redis://localhost:6379/1'
#backend(存储worker执行的结果)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

#设置时间参照,不设置默认使用的UTC时间
CELERY_TIMEZONE = 'Asia/Shanghai'
#指定任务的序列化
CELERY_TASK_SERIALIZER='json'
#指定执行结果的序列化
CELERY_RESULT_SERIALIZER='json'

CELERY_WORKER_CONCURRENCY = 2

celery初始化脚本

import os
import celery
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')
django.setup()
app = celery.Celery('demo', include=[
    'service.tasks'

])

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

文档

CoolCats
CoolCats
理学学士

我的研究兴趣是时空数据分析、知识图谱、自然语言处理与服务端开发