前言
celery有时任务会重复执行,这时就需要一步一步排查。任务重复首先排除以下几项:
- 启动多个celery beat
- 任务时间过长,超过了visibility_timeout
celerybeat重复任务
复现:某个任务每日10点执行,现时是11点,这个任务今日应已执行,last_run_at=今日10点,现在将该任务执行时间改为每日10:30,然后重启,这时beat会立即发送该任务给worker执行。
其实这也不算是重复执行,把任务执行时间重新设置在上次执行时间和现在之间,celery beat会在重启后立即执行
源码分析
beat启动流程
beat启动: celery beat -A app –schedule=celerybeat-schedule -l info 启动流程:
- Beat类中调用EmbeddedService,采用多进程还是多线程的方式
- 开启进程或线程,调用Service类
- Service类start死循环,每interval同步schedule
具体的参考。
beat重复执行
beat启动需要一个Schedule,它的作用是定时任务的持久化,默认是使用shelve库,写在celerybeat-schedule文件中。
# celerybeat-schedule
('__version__', '3.1.25')
('entries', {'task': <Entry: task task() <crontab: 0 1 * * * (m/h/d/dM/MY)>)
('tz', 'Asia/Shanghai')
('utc_enabled', True)
# 单个entry详情,里面记录了上次执行的时间和总共执行次数
$ docs['entries']['task'].__dict__
{'app': None,
'args': (),
'kwargs': {},
'last_run_at': datetime.datetime(2019, 7, 11, 0, 0, 0, 78524),
'name': 'task',
'options': {},
'schedule': <crontab: 0 1 * * * (m/h/d/dM/MY)>,
'task': 'task',
'total_run_count': 150}
它每次重启时,会加载上次的文件然后新的配置合并,相关代码
def merge_inplace(self, b):
schedule = self.schedule
A, B = set(schedule), set(b)
for key in A ^ B:
schedule.pop(key, None)
for key in B:
entry = self.Entry(**dict(b[key], name=key, app=self.app))
if schedule.get(key):
schedule[key].update(entry)
else:
schedule[key] = entry
当我们的任务时间重新设置在last_run_at和now之间,看代码
def maybe_due(self, entry, publisher=None):
is_due, next_time_to_run = entry.is_due()
if is_due: # 当is_due 为true时,就执行任务
info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
try:
result = self.apply_async(entry, publisher=publisher)
except Exception as exc:
error('Message Error: %s\n%s',
exc, traceback.format_stack(), exc_info=True)
else:
debug('%s sent. id->%s', entry.task, result.id)
return next_time_to_run
def is_due(self, last_run_at):
"""根据last_run_at 计算剩余的时间"""
rem_delta = self.remaining_estimate(last_run_at)
rem = timedelta_seconds(rem_delta)
due = rem == 0
if due:
rem_delta = self.remaining_estimate(self.now())
rem = timedelta_seconds(rem_delta)
return schedstate(due, rem)
def remaining(start, ends_in, now=None, relative=False):
"""根据上次执行时间,下次执行剩余时间,now计算剩余时间"""
now = now or datetime.utcnow()
end_date = start + ends_in
if relative:
end_date = delta_resolution(end_date, ends_in)
ret = end_date - now
if C_REMDEBUG: # pragma: no cover
print('rem: NOW:%r START:%r ENDS_IN:%r END_DATE:%s REM:%s' % (
now, start, ends_in, end_date, ret))
return ret
原理就是celery计算下次执行时间,是根据last_run_at开始计算的,不是从现在。 解释那个复现的例子,任务改为10:30,根据上次last_run_at计算周期,下次是10:30执行,10:30减去11:00小于0,立即执行。
解决
- 手动删除celerybeat-schedule文件
beat_init signal
from celery.signals import beat_init @beat_init.connect def remove_celerybeat_schedule(sender=None, **kwargs): """重启beat,重新生成celerybeat-schedule""" if sender: sender.scheduler._remove_db() sender.scheduler.setup_schedule()
3.修改源码逻辑
参考
https://docs.python.org/zh-cn/3/library/shelve.html
http://www.pythondoc.com/celery-3.1.11/index.html
https://liqiang.io/post/celery-source-analysis-worker-start-flow