分布式任务管理系统 Celery 之一

2021/5/17 10:55:25

本文主要是介绍分布式任务管理系统 Celery 之一,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一 前言 

     开发自动化管理平台的过程中,有执行时间较长的任务比如安装基础软件,备份恢复;有定时执行的任务比如定期收集元数据,检查慢日志数量等等,我们可以自己开发一套任务系统,当然也可以依赖Celery 实现上述功能。


二 Celery 是什么?

2.1 概念

       Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度,支持异步执行任务。更令人欣喜的是常见的Python的web框架都能和Celery 耦合,给广大开发者带来极大的便利。Celery 是开源的,使用 BSD 许可证 授权。


2.2 原理 

   Celery 实现异步调用的原理核心其实是将任务执行单元 worker 和 任务派发单元 分开,从而达到异步的效果;

Celery将需要执行的任务发送到消息队列中,然后再由任务执行单元根据具体的配置(绑定到具体哪个队列,默认为defaults)从消息队列中获取任务执行,这样就实现了异步的效果。 


2.3 架构

   Celery 使用简洁的模块架构提供了完整的功能,上手容易,部署简单。主要包含消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store).我们使用一张图展示Celery运行机制。

图片


task producer - 任务生产者

       顾名思义就是发起调度任务的,然后交给任务队列去处理。简单的Python代码、耦合在Django/Flask Web 服务里请求任务比如调用备份或者调用初始化安装机器的任务,在程序里面调用Celery任务装饰的函数,产生任务并分发到任务队列处理的,我们都可以称之为任务生产者。

celery beat - 任务调度器 

       Celery beat 是 Celery 系统自带的任务生产者,它以独立进程的形式存在,该进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。需要注意的是在一个Celery系统中,只能存在一个 Celery beat 调度器。

broker - 任务代理 

       其实broker就是一个队列存储,是负责接收task producer发送的任务消息,存储到队列之后再进行调度,分发给任务消费方 (celery worker)。常见的broker有RabbitMQ、Redis 等。 

celery worker - 任务消费方

      Celery worker 就是任务的执行者,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。Celery worker 对应的就是操作系统中的一个进程。Celery 支持分布式部署和横向扩展,我们可以在多个节点增加 Celery worker 的数量来增加系统的高可用性。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。

Result Stores/backend

      存储Celery worker 执行任务之后的结果和状态信息,以供应用系统查询任务的状态信息。Celery 内置支持Django ORM,Redis,RabbitMQ 等方式来保存任务处理后的状态信息。


三 快速入门

3.1 安装

本例子使用redis作为存储任务消息的介质,需要提前安装redis 并启动相关服务。

用 pip 安装:

  pip install -U Celery

用 easy_install 安装:

easy_install -U Celery


3.2 部署

     我们先使用一个单一程序文件包含所有celery相关的配置作为例子。要使用celery,就需要先初始化一个celery实例,配置好broker和backend为redis。编写程序文件。目录结构

python/

      tasks.py

      client.py

tasks.py 

import time

from celery import Celery

app = Celery('tasks',

             broker='redis://localhost:6379/1',

             backend='redis://localhost:6379/1')

@app.task

def add(x, y):

    time.sleep(2)

    return x + y


@app.task

def mul(x, y):

    time.sleep(5)

    return x * y

在和tasks 同一层级执行 

celery -A tasks worker -l info   

图片

这里需要说明的是

命令行执行celery worker -A app -l info时, app 必须可导入,app 可以为py模块或包,本例为tasks 。不管是包还是模块都必须正确指定Celery入口文件(如果为包则默认的入口文件名为 celery.py )的绝对导入名称(proj.celery),但是从工程上我们推荐在包的__init__.py 文件进行celey的初始化。 

另起一个命令行 本例子使用ipython 

In [5]: from tasks import add,mul

In [6]: add.delay(3,3)

Out[6]:

In [7]: add.delay(3,7)

Out[7]:

In [8]: r=add.delay(6,10)

In [10]: r.status

Out[10]: u'SUCCESS'

In [11]: r.result

Out[11]: 16

In [14]: r.ready()

Out[14]: True

In [15]: r.get()

Out[15]: 16

从例子可以看到,直接调用add 函数并未直接返回 6 ,10 这样的算术结果而是返回 AsyncResult 对象。程序中可以使用这个对象检查任务状态,等待任务执行完成,获取任务结果,如果任务失败,它会返回异常信息或者调用栈。

将AsyncResult赋值给r, 可以通过调用 r.get() 或者 r.result 取结任务的结果,r.status,r.ready()获取任务的执行状态。

我们在优化一下上面的调用方式,编写一段小程序 client.py ,做同步调用 当然我们也可以做异步调用。

#!/usr/bin/env python

#-*- coding:utf-8 -*-

import time

from tasks import add,mul

add_ret = add.delay(4, 4)

mul_ret = mul.delay(5, 5)

while not add_ret.ready():

    time.sleep(1)

    print "waiting for task to be done "

print 'add task done: {0}'.format(add_ret.get())

while not mul_ret.ready():

    time.sleep(2)

    print "waiting for task to be done "

print 'mul task done: {0}'.format(mul_ret.get())

执行 python client.py 

图片

celery 输出的log如下

图片

因为tasks任务中add 和mul函数都设置了等待 sleep,可以看出调用 add_ret.ready() 的时候并未直接返回结果,而是等待了具体的时间之后才返回。真实项目中我们需要改写 client.py ,利用Celery的异步执行特性。


四 小结 

   本文浅显的介绍了celery的架构和如何使用。Celery并不是一个队列,而是一套任务管理平台,通过队列实现任务的异步功能。有计划开发自己独立运维平台的还没有使用过celery朋友可以尝试用起来。 




这篇关于分布式任务管理系统 Celery 之一的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程