celery笔记五之消息队列的介绍
2023/6/22 1:22:07
本文主要是介绍celery笔记五之消息队列的介绍,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
本文首发于公众号:Hunter后端
原文链接:celery笔记五之消息队列的介绍
前面我们介绍过 task 的处理方式,将 task 发送到队列 queue,然后 worker 从 queue 中一个个的获取 task 进行处理。
task 的队列 queue 可以是多个,处理 task 的 worker 也可以是多个,worker 可以处理任意 queue 的 task,也可以处理指定 queue 的 task,这个我们在介绍 queue 的时候再做介绍。
这一篇我们来介绍一下存储 task 的队列 queue。
- 默认队列 task_default_queue
- 定义队列
- 将 task 指定到队列 queue 消费
以下的操作都是在 Django 系统的配置中使用。
1、默认队列 task_default_queue
当我们运行一个最简单的延时任务比如 add.delay(1, 2) 时,并没有设置一个消息队列,因为如果我们没有指定,系统会为我们创建一个默认队列。
这个默认的队列被命名为 celery,值在 app.conf.task_default_queue,我们可以查看一下:
from hunter.celery import app app.conf.task_default_queue # 输出为 'celery'
2、定义队列
我们可以设想一下这个场景,我们只有一个 worker 处理 task,每个 task 需要处理的时间很长,因为 worker 被占用,这样在我们的任务队列里就会积压很多的 task。
有一些需要即时处理的任务则会被推迟处理,这样的情况下,我们理想的设计是设置多个 worker,多个 worker 分别处理指定队列的 task。
关于 worker 的设置,比如添加多个 worker,给 worker 消费指定队列的 task,我们在 worker 的笔记中再介绍,这里我们介绍一下如何定义队列。
任务队列的定义如下:
# hunter/celery.py from kombu import Queue app.conf.task_queues = ( Queue('blog_tasks', ), )
当我们定义了任务队列之后,我们可以将 task 指定输出到对应的 queue,假设 blog/tasks.py 下有这样一个 task:
# blog/tasks.py from celery import shared_task @shared_task def add(x, y): return x + y
接下来我们调用这个 task 的时候,需要指定队列:
from blog.tasks import add add.apply_async((1, 2), queue='blog_tasks')
如果我们就这样配置 celery,这个时候如果我们直接再调用 delay() 函数,也就是不指定 queue 的话,会发现我们发出的 task 是不能被 worker 处理的。
也就是说,下面的操作是不起作用的:
from blog.tasks import add add.delay(1, 2) # 此时,我们的调用不会被队列接收到
如果需要在调用 task 的时候不指定队列,使用系统默认的队列,这个时候我们需要额外来指定一个 task_default_queue,celery 的配置如下:
# hunter/celery.py app.conf.task_queues = ( Queue('blog_tasks'), Queue('default_queue'), ) app.conf.task_default_queue = 'default_queue'
这样,我们在使用延时任务的时候,就不需要指定 queue 参数了,都会走我们的默认 task 队列:
from blog.tasks import add add.delay(1, 2) # 队列会被 default_queue 接收到
而如果我们想实现 add 的延时任务走的是 blog_tasks 这个队列,但是我们在调用的时候不想那么麻烦每次都指定 queue 参数,这个就需要用到 task_routes 配置项了。
3、将 task 指定到队列 queue 消费
如果我们想某些函数使用指定的 queue,我们可以使用 task_routes 配置项来操作。
现在我们有两个 application,blog 和 polls,这两个 application 下都有各自的 tasks,文件的内容如下:
# blog/tasks.py from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def minus(x, y): return x - y
# polls/tasks.py from celery import shared_task @shared_task def multi(x, y): return x * y
我们想要实现的最终的目的是在调用延时任务的时候,可以直接使用 delay() 的方式,不需要使用 apply_async(queue='xx')。
我们想要实现的功能是,polls/tasks.py 下的所有的延时任务以及 blog/tasks.py 下的 add() 函数进入 queue_1 队列
blog 下的 minus() 函数进入 queue_2 队列
其他所有的 task 都走默认的队列,default_queue。
我们可以如下配置:
app.conf.task_queues = ( Queue('queue_1'), Queue('queue_2'), Queue('default_queue'), ) app.conf.task_routes = { 'polls.tasks.*': { 'queue': 'queue_1', }, 'blog.tasks.add': { 'queue': 'queue_1', }, 'blog.tasks.minus': { 'queue': 'queue_2', }, } app.conf.task_default_queue = 'default_queue'
如果想获取更多后端相关文章,可扫码关注阅读:
这篇关于celery笔记五之消息队列的介绍的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24怎么切换 Git 项目的远程仓库地址?-icode9专业技术文章分享
- 2024-12-24怎么更改 Git 远程仓库的名称?-icode9专业技术文章分享
- 2024-12-24更改 Git 本地分支关联的远程分支是什么命令?-icode9专业技术文章分享
- 2024-12-24uniapp 连接之后会被立马断开是什么原因?-icode9专业技术文章分享
- 2024-12-24cdn 路径可以指定规则映射吗?-icode9专业技术文章分享
- 2024-12-24CAP:Serverless?+AI?让应用开发更简单
- 2024-12-23新能源车企如何通过CRM工具优化客户关系管理,增强客户忠诚度与品牌影响力
- 2024-12-23原创tauri2.1+vite6.0+rust+arco客户端os平台系统|tauri2+rust桌面os管理
- 2024-12-23DevExpress 怎么实现右键菜单(Context Menu)显示中文?-icode9专业技术文章分享
- 2024-12-22怎么通过控制台去看我的页面渲染的内容在哪个文件中呢-icode9专业技术文章分享