百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

Python任务队列Celery入门

bigegpt 2024-09-02 16:27 3 浏览

Celery是一个简单可靠的分布式任务队列,它主要关注于实时任务处理,同时也能够支持定时任务。这个项目目前在github上有14.8k颗星,是一个比较热门的项目。Celery是采用Python语言编写的,但其他语言也可以实现它的协议,或者通过webhook与之交互。Celery本身也比较精巧,据官方文档介绍,Celery的核心总共只有7000多行代码,但有14000多行测试代码,所以质量应该也还是不错的。那今天我们来一起看一下Celery的基本概念和一个小例子。

Celery简化的架构示意图如下


从这个图中我们可以看出,基本上Celery的架构就是一个典型的生产者-消费者模式。Celery中有一些基本的概念与术语,下面我们分别来了解一下

基本概念

任务(Task)

任务是被Celery的Worker执行的基本单位,一个任务一般是Python中的一个函数

@app.task
def add(x, y):
    return x + y

生产者(Producer)

生产者,也就是请求任务执行的客户端。他通过发送一个消息给broker,来要求某个任务的执行。采用delay方法能发出一个这样的message,delay()方法实际上会把工作交给apply_async()

add.delay(2, 2) 
# 等效于
add.apply_async((2, 2))

消费者(Consumer)

在Celery中消费者就是是Worker,Worker本身并不执行任务。当Worker从broker接收到消息之后,会让Worker的子进程(execution pool中)执行Task。Worker可以启动多个,也可以分布在不同的机器上,所以能很好的支持水平扩展。

创建Worker的命令为

 celery -A celery_demo worker -l info -Q demo --concurrency=3 -n worker1@%h --autoscale=3,1

消息(Message)

任务消息是生产者发给broker的要求Worker执行任务的请求。通常消息由消息头和消息体组成,消息头描述了消息的序列化类型,消息体包含任务的名字、ID和参数,一个Json格式的消息体是长这样的

{ 
  'task': 'myapp.tasks.add', 
  'id': '54086c5e-6193-4575-8308-dbab76798756', 
  'args': [4, 4], 'kwargs': {}
}

Broker

Broker是把消息从生产者传递到消费者的消息中间件,在Celery中,一般可以使用RabbitMQ或者Redis作为broker。也有一些其他的试验性质的broker,具体支持的列表可以参见官网文档

Queue

在Celery中我们可以定义多个Queue,任务可以路由到指定的Queue中,不同的Worker可以分别处理不同的Queue中的消息

Result Backend

Result Backend是用来存放Task执行的状态和结果的。在Celery中可以使用数据库、Redis/RabbitMQ等等作为Result Backend

一个例子

了解了这些概念之后,我们来看一个简单的例子。在这个例子中,我们选用Redis作为我们的消息队列。你可以选择你喜欢的方式安装reids,或者也可以使用docker

docker run -d -p 6379:6379 redis

然后我们需要安装Python的Celery和Redis依赖

pip install celery
pip install redis

接下来我们来编写两个简单的task,分别求两个数的和与差,内容放在celery_test.py中。

from celery import Celery
app = Celery('celery_test', broker='redis://localhost')
app.conf.task_routes = {'celery_test.sub': {'queue': 'qsub'},
                        'celery_test.add': {'queue': 'qadd'}
                       } 
@app.task
def add(x, y) 

	return x+ y 
@app.task
def sub(x, y) 
	return x -y

在代码中,我们初始化了一个Celery的对象。还定义了task的路由规则,指定sub任务路由到qsub队列中, add任务路由到qadd队列中。


然后我们就可以启动Celery Worker了,这里我们使用命令行启动两个Worker,分别关注不同的queue

celery -A celery_test worker --loglevel=info -Q qsub
celery -A celery_test worker --loglevel=info -Q qadd

执行命令后如果一切正常的话,会出现以下日志

  -------------- celery@host_name v4.4.2 (cliffs)
  --- ***** -----  
  -- ******* ---- Darwin-19.3.0-x86_64-i386-64bit 2020-04-12 > 09:55:21 
  - *** --- * --- 
  - ** ---------- [config] 
  - ** ---------- .> app:         celery_test:0x1049aeed0 
  - ** ---------- .> transport:   redis://localhost:6379//
  - ** ---------- .> results:     disabled:// 
  - *** --- * --- .> concurrency: 8 (prefork)
  -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
  --- ***** -----   
  -------------- [queues]
    .> qsub             exchange=qsub(direct) key=qsub


在日志中显示了我们的app的名字celery_test,使用的Redis连接信息,并没有启用Result Backend,处理任务采用prefork的方式,最大允许8个并发,并且关注一个叫qsub的队列。

启动Worker之后,应用就可以向broker发请求消息了,我们一般通过delay()方法来发送,如果需要更多的选项,如发送到指定的queue,设定延时执行等,可以使用apply_async()方法

add.delay(2, 2)
sub.delay(4, 2)

用Python执行以上两句之后,在两个Worker的console中分别可以看到收到对应task并执行的日志。

[2020-04-12 10:41:20,410: INFO/MainProcess] Received task: celery_test.sub
[50747bc7-9fa4-45f5-a5be-ad08b93dea09]  [2020-04-12 10:41:20,413: INFO/ForkPoolWorker-6] Task celery_test.sub[50747bc7-9fa4-45f5-a5be-ad08b93dea09] succeeded in 0.0005345229999997869s: 3

如果生产者发送请求消息的时刻,对应的Worker还没有启动,那消息将会被保存在broker中,等Worker上线之后,任务还是可以被正常执行。

对于不需要返回结果的任务,那到这里差不多就够了。如果像我们举例的add/sub这样需要返回结果的任务,或者我们希望知道任务的执行情况,那就可以使用Result Backend。 在定义Celery app的时候,可以指定一个 Backend。Celery目前能够支持多种Result Backend,如使用数据库的SQLAlchemy/Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP)等,我们也可以按照Celery的协议自定义自己的Backend。

这里我们还是使用Redis作为我们的Result Backend,首先将上面创建Celery对象的语句改为

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')

然后重启Worker,可以看到之前打印的启动日志中的 results: disabled:// 变成了 results: redis://localhost/

这样就可以在生产者这边获得任务执行的结果了

result = add.delay(2, 2)
result.ready() # 判断任务是否完成
result.get(timeout=1) # 获得结果

如果任务执行抛出了异常,那生产者调用result.get()时方法也会抛出这个异常。如果我们不希望得到这个异常,可以通过设置propagate参数:

result.get(propagate=False)
result.traceback # 获得异常的stacktrace

特别要注意的是,任务执行结果的保存和传输都是要消耗Result Backend的资源的,根据使用Result Backend的不同,会有不一样的资源占用情况,比如说使用Redis作为Backend,执行结果会作为Redis的键值对保存,默认超时时间为一天。为了确保资源能够正常释放,在开启Backend的情况下,建议总是去调用一下result.get或者result.forget方法

关于Celery的基本知识今天就介绍到这里,Celery还有很多灵活的配置项和一些高级的话题,如通过Celery Beat支持定时的任务、消息的限流、安全认证、Django框架的集成等等。对这些内容感兴趣的同学们可以查看官方文档,或者期待一下也许还会有的下文

相关推荐

如何使用Java API操作HDFS系统?(hdfs java api的常见环境准备?)

1.搭建项目环境打开Eclipse选择FileàNewàMavenProject创建Maven工程,选择“Createasimpleproject”选项,点击【Next】按钮,会进入“New...

DataX写插件开发-集成阿里云RocketMQ

在上一期我们对datax进行了技术调研DataX数据异构、数据同步神器,这一次我们集成一个RocketMQ写插件,能够非常方便对将mysql数据同步到MQ中,下面来总结下具体步骤。1.下载datax源...

以SpringMVC+Shiro+Mybatis为核心开发的精简后台系统源码分享

项目说明源码获取方式:关注转发之后私信回复【源码】即可免费获取到以SpringMVC+Shiro+Mybatis为核心开发的精简后台基础系统。包含用户管理,角色管理,部门管理,权限管理,菜单管理,日志...

手把手教小伙伴们使用 Nginx 部署 TienChin 项目!

今天我就来手把手教小伙伴们部署TienChin项目,一起把这个项目跑起来,看看到底是个什么样的项目。小伙伴们知道,对于这种前后端分离的项目,我们在实际部署的时候,可以按照前后端分离的方式来部署,也...

推荐一款超棒的SpringCloud 脚手架项目

之前接个私活,在网上找了好久没有找到合适的框架,不是版本低没人维护了,在不就是组件相互依赖较高。所以我自己搭建一个全新spingCloud框架,里面所有组件可插拔的,集成多个组件供大家选择,喜欢哪个用...

SpringCloud 微服务迁移到 Kubernetes 容器化完整流程

k8s容器部署流程具体步骤:第一步:熟悉SpringCloud微服务项目第二步:源代码编译构建第三步:构建项目镜像并推送到镜像仓库第四步:K8s服务编排第五步:部署服务所需的基础环境第六步:部署微服...

SpringBoot 实现动态配置及项目打包部署上线

一、动态配置文件我们需要了解Spring动态指定配置文件的方式,来提高我们的部署效率。1.1、概述在实际企业开发中,开发环境、测试环境、生产环境通常采用不同的数据库等中间件的连接方式。如果此时我们按照...

3.5 源码安装ONOS1.3.0(源码包怎么安装)

ONOS是由ON.Lab使用Java及Apache实现发布的首款开源的SDN网络操作系统,主要面向服务提供商和企业骨干网。近日笔者在学习ONOS的过程中写下了这篇文章,希望可以对刚接触ONOS的同学们...

jenkins+gitlab 实现自动化部署(jenkins配置git自动部署)

目录1、安装jdk,要记住安装路径2、安装maven,要记住安装路径3、安装git,要记住安装路径4、安装gitlab5、安装jenkins(centos7)创建安装目录下载通用war包启动和关闭Je...

CI&CD落地实践6-Jenkins接入maven构建后端springboot项目

前言在前面一篇《CI&CD落地实践5-Jenkins分布式环境搭建及多节点运行》中,我们介绍了如何在Windows及Linux系统上部署Jenkins从节点,本章节介绍如何在Jenkins创建mave...

从0到1体验Jenkins+Docker+Git+Registry实现CI自动化发布

阅读目录:一、前言二、发布流程三、环境准备四、部署思路梳理五、三台机器上操作六、Git机器上操作七、Docker机器上操作八、Jenkins机器上操作九、上传JAVA项目代码到Git仓库十、Jenki...

微服务架构实战:使用Jenkins实现自动化构建

使用Jenkins实现自动化构建一个大型平台的微服务架构设计通常会产生很多项目工程,因此会有很多服务和应用需要部署,并且需要不断地迭代和更新,这是一个庞大的工程,所以我们需要借助自动化工具,实现各个微...

Jenkins 自动化部署实例讲解(jenkins自动化部署git 项目)

前言你平常在做自己的项目时,是否有过部署项目太麻烦的想法?如果你是单体项目,可能没什么感触,但如果你是微服务项目,相信你应该是有过这种感触的。这种情况下,我一般会劝你了解一下Jenkins这个玩意...

多模块的微服务项目容器化与Git追踪发布记录

在使用了微服务后,一个项目往往由多个模块组成,而容器化发布的建议是单个容器尽量只运行单个进程。所以我们会把每个模块单独打包成镜像运行。如果每个模块都单独配置Dockerfile会让我们维护起来很麻烦。...

手把手教你使用 Jenkins+Docker 实现持续集成

作者:乐之终曲来源:https://blog.csdn.net/qq_37143673/对于Jenkins我只能用两个字形容,难用。就不过多吐槽了,本篇是基于docker环境的使用。1.安...