celery 异步任务队列任务突然不执行是为什么

celery的官方文档其实相对还是写的很不错的.但是在一些深层次的使用上面却显得杂乱甚至就没有某些方面的介绍, 通过我的一个测试环境的settings.py来说明一些使用celery的技巧和解决办法
amqp交换类型
其实一共有4种交换类型,还有默认类型和自定义类型. 但是对我们配置队列只会用到其中之三,我来一个个说明,英语好的话可以直接去看英文文档
首先思考一下流程:
celerybeat生成任务消息,然后发送消息到一个exchange(交换机)
交换机决定那个(些)队列会接收这个消息,这个其实就是根据下面的exchange的类型和绑定到这个交换机所用的bindingkey
我们这里要说的其实就是怎么样决定第二步谁接收的问题
Direct Exchange
如其名,直接交换,也就是指定一个消息被那个队列接收, 这个消息被celerybeat定义个一个routing key,如果你发送给交换机并且那个队列绑定的bindingkey 那么就会直接转给这个队列
Topic Exchange
你设想一下这样的环境(我举例个小型的应该用场景): 你有三个队列和三个消息, A消息可能希望被X,Y处理,B消息你希望被,X,Z处理,C消息你希望被Y,Z处理.并且这个不是队列的不同而是消息希望被相关的队列都去执行,看一张图可能更好理解:
对,Topic可以根据同类的属性进程通配, 你只需要routing key有&.&分割:比如上图中的usa.news, usa.weather, europe.news, europe.weather
Fanout Exchange
先想一下广播的概念, 在设想你有某个任务,相当耗费时间,但是却要求很高的实时性,那么你可以需要多台服务器的多个workers一起工作,每个服务器负担其中的一部分,但是celerybeat只会生成一个任务,被某个worker取走就没了, 所以你需要让每个服务器的队列都要收到这个消息.这里很需要注意的是:你的fanout类型的消息在生成的时候为多份,每个队列一份,而不是一个消息发送给单一队列的次数
我的settings.py
这里只是相关于celery的部分:
import djcelery
djcelery.setup_loader()
INSTALLED_APPS = (
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
#'django.contrib.staticfiles',
'django.contrib.messages',
# Uncomment the next line to enable the admin:
'django.contrib.admin',
'django.contrib.staticfiles',
# Uncomment the next line to enable admin documentation:
# 'django.contrib.admindocs',
'dongwm.smhome',
'dongwm.apply',
'djcelery', # 这里增加了djcelery 也就是为了在django admin里面可一直接配置和查看celery
'django_extensions',
'djsupervisor',
'django.contrib.humanize',
'django_jenkins'
BROKER_URL = 'amqp://username:password@localhost:5672/yourvhost'
CELERY_IMPORTS = (
'dongwm.smhome.tasks',
'dongwm.smdata.tasks',
CELERY_RESULT_BACKEND = "amqp" # 官网优化的地方也推荐使用c的librabbitmq
CELERY_TASK_RESULT_EXPIRES = 1200 # celery任务执行结果的超时时间,我的任务都不需要返回结果,只需要正确执行就行
CELERYD_CONCURRENCY = 50 # celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以
CELERYD_PREFETCH_MULTIPLIER = 4 # celery worker 每次去rabbitmq取任务的数量,我这里预取了4个慢慢执行,因为任务有长有短没有预取太多
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每个worker执行了多少任务就会死掉,我建议数量可以大一些,比如200
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 这是使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中
CELERY_DEFAULT_QUEUE = "default_dongwm" # 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面
CELERY_QUEUES = {
"default_dongwm": { # 这是上面指定的默认队列
"exchange": "default_dongwm",
"exchange_type": "direct",
"routing_key": "default_dongwm"
"topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
"routing_key": "topictest.#",
"exchange": "topic_exchange",
"exchange_type": "topic",
"test2": { # test和test2是2个fanout队列,注意他们的exchange相同
"exchange": "broadcast_tasks",
"exchange_type": "fanout",
"binding_key": "broadcast_tasks",
"exchange": "broadcast_tasks",
"exchange_type": "fanout",
"binding_key": "broadcast_tasks2",
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task.startswith('topictest'):
'queue': 'topicqueue',
# 我的dongwm.tasks文件里面有2个任务都是test开头
elif task.startswith('dongwm.tasks.test'):
"exchange": "broadcast_tasks",
# 剩下的其实就会被放到默认队列
return None
# CELERY_ROUTES本来也可以用一个大的含有多个字典的字典,但是不如直接对它做一个名称统配
CELERY_ROUTES = (MyRouter(), )
Views(...) Comments()Celery的最佳实践 - 简书
Celery的最佳实践
是一个用于执行异步任务的框架。这个框架使用python编写,使用这个框架,很容易就能将Web应用中的一些耗时的作业转交给工作池,让工作池中的worker以异步的方式执行这些作业。
Celery架构
一般通过启动一个或多个worker进程来部署Celery。这些worker进程连接上消息代理(以下称之为broker)来获取任务请求。broker随机将任务请求分发给worker。通过调用Celery的API,用户生成一个任务请求,并且将这个请求发布给broker。在worker完成任务后,将完成的任务信息发送给broker。通过启动新的worker进程并让这些进程连上broker,可以很方便的扩展worker池。每个worker可以和其他的worker同步执行任务。
broker的选择
官方支持的列表如下
Monitoring
Remote Control
Amazon SQS
Experimental
如果需要存储任务的结果,需要给Celery配置一个result store。Redis和memcache是很好的选择。
创建Celery应用
首先,安装Celery
pip install celery
然后,为celery应用创建一个模块。对于小的应用,通常的做法是把所有代码放在一个叫tasks.py的文件中
import celery
app = celery.Celery('example')
task 是Celery中最基本的单元。Celery有很多装饰器来定义task,只需要编写一个函数并且加上一个装饰器,就能注册一个能异步执行的任务,比如下面这个函数
def add(x, y):
return x+y
通过下面的代码,就能让这个任务异步执行
add.delay(1,2)
管理异步任务
Celery中的任务都是异步执行的,也就是说,在主进程中调用Celery函数后,这个函数会将任务信息发送给broker后立刻返回。有两种方式可以获取任务执行的结果。一种方式是将执行的结果持久化,比如写入数据库。在大多数情况下,将异步任务的结果写入数据库是很低效的。另外一种方式是使用Celery中结果存储(result store)的功能。这个功能是可选的,需要配置相关的参数。
选择序列化格式
Celery任务的输入和输出都要经过序列化和反序列化。序列化会带来一系列的问题,最好在设计任务的时候就将这点考虑到。Celery默认会使用Pickle来对消息进行序列化。Pickle的好处是简单易用,但是在使用的过程中会有一些坑。当代码发生变动时,已经序列化的对象,反序列化后依然是变更前的代码。好的实践是使用JSON作为序列化格式,使用JSON,不仅可以强迫开发者认真地设计参数,还可以避免使用pickle带来的安全隐患。使用下面的配置
CELERY_TASK_SERIALIZER=json
在Celery的worker池中,worker并发地执行任务。因此,将任务设置成多线程是有意义的。每个任务应该尽可能做最小的有用工作量,以便尽可能高效地分配工作。在发布任务的过程中,我们会将任务通过网络发给broker,broker通过网络发给worker,在worker上对任务进行反序列化,这个过程中的开销比线程之间传递任务信息要大的多。我们在设计任务的时候要考虑到这一点。如果把向数据库插入一条数据作为一个Celery任务,对资源的利用率将是不高的。但是,在同一个任务中进行1次API调用而不是几次,将会有很大的区别。短任务会让部署和重启变得容易些。并且会比那些长任务更加不容易出错。
为任务设置超时
当执行一个任务成百上千次时,由于网络问题可能导致一个任务卡住,导致队列被阻塞而不能处理更多的任务。可以通过设置hard timeouts和soft timeouts来解决这个问题。相关文档见。
创建幂等的任务
任务可能会由于各种各样的原因报错或者被打断。在分布式系统中,与其想办法处理所有可能导致出错的情景,不如在设计任务时实现幂等性。在任务开始时,永远不要预设系统的状态。尽可能不要改变外部的状态。任务在重跑时带来的副作用越小,一个分布式的系统就越能够自我修复。举个例子,当一个预想不到的错误发生时,幂等的任务只要告诉Celery去重跑就行。如果错误是短暂的,任务的幂等性能使得系统在没有人为干涉的情况下能很快自我修复。
使用acks_late
当worker收到broker发来的任务时,worker会向broker回复一个确认信息(acknowledgement)。通常情况下,broker在收到这个ack后,会将这个任务从队列中移除。但是,加入worker在执行任务的时候突然挂掉,并且已经向broker发送的确认信息,这个任务将不会再次执行。Celery在检测到worker挂掉的情况下,会尝试向其他的worker重新发送这个任务信息。但是在一些极端情况下,比如网络挂掉,硬件错误或者其他的场景下,Celery不能正确地处理这一情况。可以通过配置acks_late=True,使worker只有在任务完成(成功/失败)的情况下,才向broker发送确认信息。在任务信息不能丢失的场景中,这个功能是及其有用的。但是,只有在任务被设计成幂等,以及短任务的情况(broker在将任务发送给新的worker前,会保留任务信息一段时间)下,才有用的一个配置项。
自定义task类
虽然任务看起来像一个函数,但是在用Celery装饰器装饰那个函数时,实际上是返回了一个类,这个类实现了__call__方法。(这也是为什么task可以被绑定到self,并且可以使用delay和apply_async方法的原因)。这个装饰器使用起来很方便,但是有些情况下,一些任务可能会有同样的特性,或者执行同样的流程,如果继续沿用函数式的写法,可能不会很好地表达这种特性。
通过创建celery.Task的抽象子类,可以通过继承,来构建一套其他任务所需的工具和行为。子类的常见行为包括,设置rate和重试行为,初始化,甚至是一些配置项。比如下面这个例子
class SubTask(Task):
abstract = True
default_retry_delay = 1
max_retries = 3
ignore_result = True
task_time_limit = 15
acks_late = True
提到了一些可以将多个任务组成一个工作流的方法。现在就开始熟悉这些方法吧。不损害上面原则的前提下,它们提供了一些方法来完成复杂的任务。尤其是chord,这个方法让任务并发执行,并且在任务完成的时候将结果传递给其他的任务。当然,这样的特性要求配置一个result store。
go/python 开发
如需转载前往/material/author?id=25378 获取合法授权}

我要回帖

更多关于 celery 定时任务 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信