定时执行某项任务是非常常见的一个需求,简单的 crontabl
就可以完成。我们最近的一个 Django 项目中用了 Apscheduler 来调度定时任务,遇到了一些问题。
首先,Apscheduler 定位是一个依赖于你的应用的任务调度库,非常轻巧:
That APScheduler is not a daemon or service itself, nor does it come with any command line tools. It is primarily meant to be run inside existing applications.
Apscheduler 是跟随你的应用来启动的,没有持久化,没有命令行操作任务的工作,运行在应用的内部(这其中很挫的一个问题是,每次运行 Django manage.py
都会启动这玩意,甚至跑 migrate
都会启动,不过这很可能是我们的用法有问题)。如果一些小巧的项目用起来倒是不错,但是如果需要对任务进行管理和控制,就有很多限制了。
Apscheduler 分布式的问题
我们遇到的问题是:Gunicorn 部署 Django 的时候会起来 4 个进程实例,我们 Django 应用内的 Apscheduler 也会跟着起来 4 个,那么遇到定时制定任务的时候,每个任务都会执行 4 次。这就很蛋疼了。
我们先是用了一个临时的解决方案,就是让任务的执行有幂等性。任务是拉去一部分数据到 MySQL 数据库中,那么就在数据库建立一些 Uniqe 索引,后面重复执行的任务放不进去。这样的做法显然会浪费一些资源,但是从结果上看是可以保证正确的,数据肯定不会被重复存储。缺点是每个任务都要考虑到同步的问题,或是类似的唯一索引,或是加锁。
然后又有了一种解决方案,manage.py
在运行的时候使用一个文件锁,这样无论有多少个 Django 在运行,都只会有一个 Apscheduler 存在。这样这个问题就可以被忽略了,大家可以再污染实际的定时任务逻辑了。可这种锁这对本地的多个 Django 有效,如果分布式部署 Django ,那么问题又来了,每一个机器的多个 Django 中都会存在一个 Apscheduler。对于解决单机多个 Apscheduler 的方案中,还有一个比较灵巧的,但是稍微复杂一些。原理是让 Gunicorn 先载入 app 然后再 fork 进程,载入 app 的时候开启一个 scheduler 线程,fork 的时候就不会再开线程了。这样每一个 Django 中都有 worker 可以用(jobstore 必须不能使用内存,要用外部可共享的),但是只有一个 scheduler 存在)。具体的步骤如下:
- Gunicorn 启动的命令带上
--preload
参数,先载入 app 再 fork 进程,这样下面的代码只会在 master 执行一次:-
1234rerun_monitor = Scheduler()rerun_monitor.start()rerun_monitor.add_interval_job(job_to_be_run,\seconds=JOB_INTERVAL)
-
- jobstore 选择非 memory 的选项。否则只有一个 worker 在跑(和 scheduler 在同一个 Django 中的那个),使用外部的 jobstore 就可以让所有的 worker 去跑 scheduler 了
- Scheduler 选择 BackgroundScheduler 。因为 BackgroundScheduler 的实现是,调用
start()
会开一个线程来调度。这样我们在 master 进程 load app 的时候会调用一次,Gunicorn fork 出来不会再调用,所以就一直有一个线程存在。
Celery 的解决方案
Celery 作为一个分布式的任务队列,解决方案就比较简单了:直接就只有一个 scheduler 存在,它调度任务,多个 worker 来执行。
You have to ensure only a single scheduler is running for a schedule at a time, otherwise you’d end up with duplicate tasks. Using a centralized approach means the schedule doesn’t have to be synchronized, and the service can operate without using locks.
既然只有一个 scheduler ,那么周期任务既不需要同步,也不需要加锁。缺点就是一个单点。(部署 scheduler 的地方需要提供一个可读写的文件让 scheduler 保存最后一次执行的任务,这个 scheduler 其实是有状态的。)
对于可用性要求不是特别高、周期性任务实时性要求不是特别高的项目来说,其实也够用了。而且我认为 celery 应该会实现类似调度失败重试的机制,不至于和 crontab 那么简陋。
其实我设想的任务调度应该是多个 scheduler 对于某任务、某时间生成一个唯一 token,然后拿 token 去队列里放任务。应该可以解决分布式的问题,即高可用又是幂等的。
其余的一些方案:
请教下,本地执行 gunicorn -w 4 server:app,每个 server 执行一次 scheduler.start(),并没有重复执行
你这么说我也不知道具体问题出在哪里。确定是一个定时任务吗?还是一个普通的请求被 gunicorn 分配到了仅一个 worker?
既然只有一个 scheduler ,那么周期任务既不需要同步,也不需要加锁。