实现一个 python 版本的 Crond Service
在微服务的实践中,所有的应用都被打包成了容器。但是总会遇到一些作业定时调度的需求,在不知道有 APScheduler
的情况下通常的办法都会想到用 Linux 系统中自带的 Crond Service 来完成相应的作业调度功能。那么如果要将这部分的实现迁移到容器中来,大多数情况下会选择同时集成 Crond Service 到容器。这就破坏了一个进程一个容器
的容器设计思想,并且在实际的运行过程中 Crond Service 的运行也不是足够的稳定。默认情况下 Crond Service 启动后,会自动在 /var/run/
中创建一个 crond.pid
文件。如果为镜像新增了什么配置并且使用 docker commit
重新打包了之后,这个 crond.pid
文件也会被打包并会干扰到下次的容器启动或者 docker exec
的操作。
在知道了 APScheduler
后,一个办法是通过重构将原有的调度全部放到代码中,但是这种重构带来的问题是未来每次新增一个作业调度都要增加代码。无疑是大大的增加了服务开发的时间。另外一个办法是考虑实现一个 python 版本的 Crond Service 服务。
因为存在 python-crontab
这样的库,使用 python 管理 crontab
文件的复杂度被大大的降低,而同时结合 APScheduler
就可以实现一个 python 版本的 Crond Service 服务。
import os
import datetime
import json
import time
import requests
import traceback
import hashlib
from watchdog.observers import Observer
from crontab import CronTab
from watchdog.events import FileSystemEventHandler
from threading import Thread, Event
import queue as Queue
from apscheduler.schedulers.background import BackgroundScheduler
from cmop.common.log import info, warn, debug, error
from cmop.common.rabbitmq_interface import RabbitmqService
from cmop.common.utils import RedisClient, run_local_command, byteify
REDIS_HOST = "redis-master"
REDIS_PORT = 6379
SYSTEM_CRONTAB = "/etc/crontab"
CROND_SERVICE_PREFIX = "crond-service-cronjob-cache"
MSGQUEUE = Queue.Queue()
CROND_RESTART_EVENT = Event()
class SchedulerManagement(object):
scheduler_dict = {}
start_flag_dict = {}
def __init__(self, key_prefix):
self._scheduler_key_prefix = key_prefix
if not self.scheduler:
info(f"Init scheduler: {self._scheduler_key_prefix}")
self.start_flag = False
self.scheduler = BackgroundScheduler()
self.scheduler.configure(
job_defaults={
"coalesce": True,
"max_instances": 3000,
"misfire_grace_time": 60,
}
)
self.scheduler.add_jobstore(
"redis",
jobs_key="{0}.jobs".format(key_prefix),
run_times_key="{0}.run_times".format(key_prefix),
host="redis-master",
port=6379,
)
else:
info(f"Scheduler: {self._scheduler_key_prefix} has already started, ignore initialize...")
@property
def scheduler(self):
if self._scheduler_key_prefix not in SchedulerManagement.scheduler_dict:
return None
else:
return SchedulerManagement.scheduler_dict[self._scheduler_key_prefix]
@scheduler.setter
def scheduler(self, value):
if not value:
# delete related instance
info(f"Scheduler {self._scheduler_key_prefix} will be destroy...")
del SchedulerManagement.scheduler_dict[self._scheduler_key_prefix]
SchedulerManagement.scheduler_dict[self._scheduler_key_prefix] = value
@property
def start_flag(self):
if self._scheduler_key_prefix not in SchedulerManagement.start_flag_dict:
SchedulerManagement.start_flag_dict[self._scheduler_key_prefix] = False
return SchedulerManagement.start_flag_dict[self._scheduler_key_prefix]
@start_flag.setter
def start_flag(self, value):
SchedulerManagement.start_flag_dict[self._scheduler_key_prefix] = value
def run(self):
if not self.start_flag:
self.scheduler.start()
self.start_flag = True
info("Start scheduler [%s]..." % (self.start_flag))
def shutdown(self):
if self.start_flag:
self.scheduler.shutdown()
self.start_flag = False
self.scheduler = None
info("Stop scheduler...")
def _send_schedule_notify(self, exchange, routing_key, record):
info("Begin to send scheduled notify...")
try:
rs = RabbitmqService()
rs.publish(exchange, routing_key, message=record)
except Exception as ex:
error(traceback.format_exc())
error("Failed to send data %r due to %r" % (record, ex))
def add_common_notify_callback(
self, m="*", h="*", dom="*", mon="*", dow="*", jobid=None, cb=None, cb_args=None
):
job = self.scheduler.get_job(jobid)
if job:
info(f"The job {jobid} has already exists, ignore...")
else:
info("Add notify callback...")
self.scheduler.add_job(
cb,
args=cb_args,
trigger="cron",
minute=m,
hour=h,
day=dom,
month=mon,
day_of_week=dow,
start_date=datetime.datetime.now(),
id=jobid,
)
self.scheduler.resume()
def add_common_notify_callback_once(
self, run_date, jobid=None, cb=None, cb_args=None
):
info("Add once notify callback...")
self.scheduler.add_job(
cb, args=cb_args, trigger="date", run_date=run_date, id=jobid
)
self.scheduler.resume()
def remove_all_jobs(self):
self.scheduler.remove_all_jobs()
def remove_job(self, jobid=None):
if jobid:
job = self.scheduler.get_job(jobid)
if job:
self.scheduler.remove_job(jobid)
else:
self.scheduler.remove_all_jobs()
def add_schedule(
self, m="*", h="*", dom="*", mon="*", dow="*", jobid=None, cb=None, cb_args=()
):
info("Add new scheduled job...")
cb = cb or self._send_schedule_notify
if jobid:
redis_clt = RedisClient(REDIS_HOST, REDIS_PORT, decode_responses=True)
body = redis_clt.client.hget(self._scheduler_key_prefix, jobid)
if body:
cb_args += (body,)
self.add_common_notify_callback(
m=m, h=h, dom=dom, mon=mon, dow=dow, jobid=jobid, cb=cb, cb_args=cb_args
)
def add_schedule_once(self, run_date=None, jobid=None, cb=None, cb_args=()):
info("Add new once scheduled job...")
cb = cb or self._send_schedule_notify
if jobid:
redis_clt = RedisClient(REDIS_HOST, REDIS_PORT, decode_responses=True)
body = redis_clt.client.hget(self._scheduler_key_prefix, jobid)
if body:
cb_args += (body,)
if run_date:
self.add_common_notify_callback_once(
run_date=run_date, jobid=jobid, cb=cb, cb_args=cb_args
)
else:
info("Failed to get schedule run date, ignore cron config...")
class CronTabEventHandler(FileSystemEventHandler):
"""Logs all the events captured."""
def on_moved(self, event):
super(CronTabEventHandler, self).on_moved(event)
what = "directory" if event.is_directory else "file"
debug(f"Moved {what}: from {event.src_path} to {event.dest_path}")
def on_created(self, event):
super(CronTabEventHandler, self).on_created(event)
what = "directory" if event.is_directory else "file"
debug(f"Created {what}: {event.src_path}")
def on_deleted(self, event):
super(CronTabEventHandler, self).on_deleted(event)
what = "directory" if event.is_directory else "file"
debug(f"Deleted {what}: {event.src_path}")
def on_modified(self, event):
super(CronTabEventHandler, self).on_modified(event)
redis_clt = RedisClient(REDIS_HOST, REDIS_PORT, decode_responses=True)
if not event.is_directory and event.src_path == SYSTEM_CRONTAB:
debug(f"Modified file: {event.src_path}")
# read the crontab file and set it to apscheduler
# clean redis
redis_clt.client.delete(CROND_SERVICE_PREFIX)
system_cron = CronTab(tabfile=SYSTEM_CRONTAB, user=False)
cron_jobs = []
for job in system_cron:
job_mark = f"{str(job.minute)},{str(job.hour)},{str(job.dom)},{str(job.month)},{str(job.dow)},{job.user},{job.command}"
jobid = f"crondServiceTask-{hashlib.md5(job_mark.encode('utf-8')).hexdigest()[8:-8]}"
MSGQUEUE.put(
(
str(job.minute),
str(job.hour),
str(job.dom),
str(job.month),
str(job.dow),
jobid,
job.command,
)
)
cron_jobs.append(
[
str(job.minute),
str(job.hour),
str(job.dom),
str(job.month),
str(job.dow),
job.user,
job.command,
]
)
else:
# record in redis
redis_clt.client.set(CROND_SERVICE_PREFIX, json.dumps(cron_jobs))
# clean current jobs and restart crond service
CROND_RESTART_EVENT.set()
class CrondService(Thread):
"""Start a thread run like crond service"""
def __init__(self, name, **kwargs):
super(CrondService, self).__init__(name=name)
def _recovery_system_crontab(self):
# check the file has already existed
if not os.path.exists(SYSTEM_CRONTAB):
info(f"There is no related file {SYSTEM_CRONTAB}, will create one...")
fp = open(SYSTEM_CRONTAB, "x")
fp.close()
# recovery from redis and write to /etc/crontab
info("Recovery crond setting from redis...")
redis_clt = RedisClient(REDIS_HOST, REDIS_PORT, decode_responses=True)
body = redis_clt.client.get(CROND_SERVICE_PREFIX)
crond_info = json.loads(body, object_hook=byteify) if body else None
if crond_info:
system_cron = CronTab(tabfile=SYSTEM_CRONTAB, user=False)
for cron in crond_info:
m, h, dom, mon, dow, user, cmd = cron
job_mark = (
f"{str(m)},{str(h)},{str(dom)},{str(mon)},{str(dow)},{user},{cmd}"
)
jobid = f"crondServiceTask-{hashlib.md5(job_mark.encode('utf-8')).hexdigest()[8:-8]}"
newjob = system_cron.new(
command=cmd, comment=f"crond service {jobid}", user=user
)
newjob.setall(f"{str(m)} {str(h)} {str(dom)} {str(mon)} {str(dow)}")
MSGQUEUE.put(
(
str(m),
str(h),
str(dom),
str(mon),
str(dow),
jobid,
cmd,
)
)
else:
system_cron.write()
info("Recovery crond setting completed.")
else:
info("There is no related cron job record in cache, ignore recovery...")
def run(self):
info("Begin to start crond service monitor...")
crond_tid = None
# recovery the cron job
self._recovery_system_crontab()
# create a new thread to run crond service
while True:
if not crond_tid:
crond_tid = Thread(target=self._crond_service_run)
crond_tid.start()
# try some counts to wait the thread alive
for _ in range(3):
if crond_tid.isAlive():
break
else:
warn("crond service thread is not ready, waitting..")
time.sleep(2)
else:
warn("crond service can not be ready.")
break
info("Crond service started...")
else:
# monitor the event and restart the thread
CROND_RESTART_EVENT.wait()
# restart the crond service
# waith the thread stop
info("Waitting for Crond service stop...")
crond_tid.join()
crond_tid = None
info("Crond service stopped...")
CROND_RESTART_EVENT.clear()
else:
info("Crond service has quit...")
def _crond_service_run(self):
info("Begin to start Crond Service...")
path = "/etc"
event_handler = CronTabEventHandler()
observer = Observer()
info("Begin to watch /etc/crontab file...")
observer.schedule(event_handler, path, recursive=True)
observer.start()
key_prefix = "crond_service_schedule"
sch_manage = SchedulerManagement(key_prefix=key_prefix)
sch_manage.run()
# remove all jobs
sch_manage.remove_job()
while True:
msg_consumed = False
try:
# get the crontab setting and set to
# apscheduler
if not MSGQUEUE.empty():
m, h, dom, mon, dow, jobid, cmd = MSGQUEUE.get(
block=False, timeout=3
)
msg_consumed = True
sch_manage.add_schedule(
m, h, dom, mon, dow, jobid, run_local_command, (cmd,)
)
else:
# check the Event and decide if quit
if CROND_RESTART_EVENT.wait(timeout=1):
info("Thread recive the quit event, will quite...")
break
except Exception as ex:
error(traceback.format_exc())
error(f"Failed to run crond service scheduler manager due to {str(ex)}")
break
finally:
if msg_consumed:
MSGQUEUE.task_done()
observer.stop()
observer.join()
sch_manage.shutdown()
if __name__ == '__main__':
# start crond service
crond = CrondService("crond-service")
crond.start()
流程图如下:
graph TD;
A[Begin]-->B[Recovery Crond Job];
subgraph main_thread;
B-->C[Start Crond main Thread];
C-->D[Wait Crond restart event];
D--restart event-->E[Stop Crond main Thread];
E-->C;
end;
subgraph crond_thread;
C-->F[Start Watcher];
F-->G[Start Scheduler];
G-->H[Remove all jobs of Scheduler];
H-->I{Check job queue if empty};
I--not empty-->J[Get new job];
J-->K[Add new job to Scheduler];
I--empty-->K{Wait Crond restart event};
K--restart event-->L[Quit];
K--timeout-->I;
L--stopped-->E;
end;
subgraph watcher_thread;
F-->M[crontab file modified notify];
M-->N[send new to job queue];
N-->O[send restart event];
O--send restart event-->D;
end;
subgraph scheduler_thread;
F-->P[Trigger job];
end;