Celery
2021/8/30 6:06:24
本文主要是介绍Celery,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Celery简介
Celery是什么
- Celery是python中使用比较多的并行分布式框架
- Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
- Celery专注于实时处理的异步任务队列
- Celery同时也支持任务调
Celery使用场景
celery是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)
和定时任务(crontab)
- 异步任务: 将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
- 定时任务: 定时执行某件事情,比如每天数据统计
Celery工作流程
Celery核心组件
Celery的架构由三部分组成,消息中间件(Broker),任务执行单元(Worker)和任务执行结果存储(Result)组成。
消息中间件(Broker)
Broker负责创建任务队列,根据一些路由规则将任务分派到任务队列,然后将任务从任务队列交付给worker任务执行单元(Worker)
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中,运行后台作业的进程任务结果存储(Result)
Result用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis等
另外
: Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
其他
: Celery还支持不同的并发、序列化和压缩的手段
- 并发:prefork、eventlet、gevent、threads
- 序列化:pickle、json、yaml、msgpack 等
- 压缩:zlib,、bzip2
Celery安装相关软件
备注:我是在windows10环境下做的,Linux应该大同小异
Celery安装
pip install celery
其他安装
这里有个坑.win10系统启动worker报错ValueError: not enough values to unpack (expected 3, got 0),解决办法:
pip install eventlet
Django中使用的时候,报错 AttributeError: ‘str’ object has no attribute ‘items’。redis版本太高,降低版本 pip install redis==2.10.6
pip install redis==2.10.6
Celery代码实现
Celery基本使用方法
备注:在这里,我使用的是redis作为消息中间件
目录结构
代码实现
# app.py from task import add if __name__ == '__main__': print("Start Task ...") result = add.delay(2, 8) print("result:",result) # 存到redis之后,返回的id print("result_id:",result.id) # 存到redis之后,返回的id print("result:", result.get()) # 方法返回值 print("End Task ...")
# task.py import time from celery import Celery # 实例化一个Celery broker = 'redis://ip:6379/1' backend = 'redis://ip:6379/2' # 参数1 自动生成任务名的前缀 # 参数2 broker 是我们的redis的消息中间件 # 参数3 backend 用来存储我们的任务结果的 app = Celery('my_task', broker=broker, backend=backend) # 加入装饰器变成异步的函数 @app.task def add(x, y): print('Enter call function ...') time.sleep(4) return x + y if __name__ == '__main__': # 这里生产的任务不可用,导入的模块不能包含task任务。会报错 print("Start Task ...") result = add.delay(2, 8) print("result:", result) print("End Task ...")
终端启动服务
celery -A task worker -l info -P eventlet
- A :参数指定celery对象的位置
- l :参数指定worker的日志级别
服务启动后的展示
运行代码,app.py
备注:不要运行task.py,会报错
这个时候可以看终端,看看请求
检验这个id的值
新增检查文件 check.py
# check.py from celery.result import AsyncResult from task import app async_result=AsyncResult(id="455d6ad7-39cc-4e94-9fa9-456ae49cdd97", app=app) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除 elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
这篇关于Celery的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-09CMS内容管理系统是什么?如何选择适合你的平台?
- 2025-01-08CCPM如何缩短项目周期并降低风险?
- 2025-01-08Omnivore 替代品 Readeck 安装与使用教程
- 2025-01-07Cursor 收费太贵?3分钟教你接入超低价 DeepSeek-V3,代码质量逼近 Claude 3.5
- 2025-01-06PingCAP 连续两年入选 Gartner 云数据库管理系统魔力象限“荣誉提及”
- 2025-01-05Easysearch 可搜索快照功能,看这篇就够了
- 2025-01-04BOT+EPC模式在基础设施项目中的应用与优势
- 2025-01-03用LangChain构建会检索和搜索的智能聊天机器人指南
- 2025-01-03图像文字理解,OCR、大模型还是多模态模型?PalliGema2在QLoRA技术上的微调与应用
- 2025-01-03混合搜索:用LanceDB实现语义和关键词结合的搜索技术(应用于实际项目)