前言

celery有时任务会重复执行,这时就需要一步一步排查。任务重复首先排除以下几项:

  1. 启动多个celery beat
  2. 任务时间过长,超过了visibility_timeout

celerybeat重复任务

复现:某个任务每日10点执行,现时是11点,这个任务今日应已执行,last_run_at=今日10点,现在将该任务执行时间改为每日10:30,然后重启,这时beat会立即发送该任务给worker执行。

其实这也不算是重复执行,把任务执行时间重新设置在上次执行时间和现在之间,celery beat会在重启后立即执行

源码分析

beat启动流程

beat启动: celery beat -A app –schedule=celerybeat-schedule -l info 启动流程:

  1. Beat类中调用EmbeddedService,采用多进程还是多线程的方式
  2. 开启进程或线程,调用Service类
  3. Service类start死循环,每interval同步schedule

具体的参考

beat重复执行

beat启动需要一个Schedule,它的作用是定时任务的持久化,默认是使用shelve库,写在celerybeat-schedule文件中。

# celerybeat-schedule
('__version__', '3.1.25')
('entries', {'task': <Entry: task task() <crontab: 0 1 * * * (m/h/d/dM/MY)>)
('tz', 'Asia/Shanghai')
('utc_enabled', True)

# 单个entry详情,里面记录了上次执行的时间和总共执行次数
$ docs['entries']['task'].__dict__
{'app': None,
 'args': (),
 'kwargs': {},
 'last_run_at': datetime.datetime(2019, 7, 11, 0, 0, 0, 78524),
 'name': 'task',
 'options': {},
 'schedule': <crontab: 0 1 * * * (m/h/d/dM/MY)>,
 'task': 'task',
 'total_run_count': 150}

它每次重启时,会加载上次的文件然后新的配置合并,相关代码

def merge_inplace(self, b):
    schedule = self.schedule
    A, B = set(schedule), set(b)
    for key in A ^ B:
        schedule.pop(key, None)
    for key in B:
        entry = self.Entry(**dict(b[key], name=key, app=self.app))
        if schedule.get(key):
            schedule[key].update(entry)
        else:
            schedule[key] = entry

当我们的任务时间重新设置在last_run_at和now之间,看代码

def maybe_due(self, entry, publisher=None):
    is_due, next_time_to_run = entry.is_due()

    if is_due: # 当is_due 为true时,就执行任务
        info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
        try:
            result = self.apply_async(entry, publisher=publisher)
        except Exception as exc:
            error('Message Error: %s\n%s',
                  exc, traceback.format_stack(), exc_info=True)
        else:
            debug('%s sent. id->%s', entry.task, result.id)
    return next_time_to_run

def is_due(self, last_run_at):
    """根据last_run_at 计算剩余的时间"""
    rem_delta = self.remaining_estimate(last_run_at)
    rem = timedelta_seconds(rem_delta)
    due = rem == 0
    if due:
        rem_delta = self.remaining_estimate(self.now())
        rem = timedelta_seconds(rem_delta)
    return schedstate(due, rem)

def remaining(start, ends_in, now=None, relative=False):
    """根据上次执行时间,下次执行剩余时间,now计算剩余时间"""
    now = now or datetime.utcnow()
    end_date = start + ends_in
    if relative:
        end_date = delta_resolution(end_date, ends_in)
    ret = end_date - now
    if C_REMDEBUG:  # pragma: no cover
        print('rem: NOW:%r START:%r ENDS_IN:%r END_DATE:%s REM:%s' % (
            now, start, ends_in, end_date, ret))
    return ret

原理就是celery计算下次执行时间,是根据last_run_at开始计算的,不是从现在。 解释那个复现的例子,任务改为10:30,根据上次last_run_at计算周期,下次是10:30执行,10:30减去11:00小于0,立即执行。

解决

  1. 手动删除celerybeat-schedule文件
  2. beat_init signal

    from celery.signals import beat_init
    
    @beat_init.connect
    def remove_celerybeat_schedule(sender=None, **kwargs):
    """重启beat,重新生成celerybeat-schedule"""
    if sender:
        sender.scheduler._remove_db()
        sender.scheduler.setup_schedule()
    

3.修改源码逻辑

参考

https://docs.python.org/zh-cn/3/library/shelve.html http://www.pythondoc.com/celery-3.1.11/index.html
https://liqiang.io/post/celery-source-analysis-worker-start-flow