[INFO] [2021-07-26 08:20:42,798] [1:140259369826048] - dag_scheduler.py[line:298]: scheduling job 202107260820309976351 [INFO] [2021-07-26 08:20:42,800] [1:140259369826048] - task_scheduler.py[line:28]: scheduling job 202107260820309976351 tasks [INFO] [2021-07-26 08:20:42,836] [1:140259369826048] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status [INFO] [2021-07-26 08:20:42,845] [1:140259369826048] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'elapsed': None, 'end_time': None, 'job_id': '202107260820309976351', 'party_id': '0', 'party_status': 'running', 'role': 'local', 'start_time': 1627287632259, 'status': 'running', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'update_time': 1627287631078} [INFO] [2021-07-26 08:20:42,845] [1:140259369826048] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 [WARNING] [2021-07-26 08:20:42,860] [1:140259369826048] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect [INFO] [2021-07-26 08:20:42,867] [1:140259369826048] - task_scheduler.py[line:143]: job 202107260820309976351 task 202107260820309976351_upload_0 0 status is running, calculate by task party status list: ['running'] [INFO] [2021-07-26 08:20:42,867] [1:140259369826048] - task_scheduler.py[line:75]: finish scheduling job 202107260820309976351 tasks [INFO] [2021-07-26 08:20:42,868] [1:140259369826048] - dag_scheduler.py[line:310]: Job 202107260820309976351 status is running, calculate by task status list: ['running'] [INFO] [2021-07-26 08:20:42,868] [1:140259369826048] - dag_scheduler.py[line:325]: finish scheduling job 202107260820309976351
调度已经为endstatus的job
源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
schedule_logger().info("start schedule end status jobs to update status") jobs = JobSaver.query_job(is_initiator=True, status=set(EndStatus.status_list()), end_time=[current_timestamp() - END_STATUS_JOB_SCHEDULING_TIME_LIMIT, current_timestamp()]) schedule_logger().info(f"have {len(jobs)} end status jobs") for job in jobs: schedule_logger().info(f"schedule end status job {job.f_job_id}") try: update_status = self.end_scheduling_updates(job_id=job.f_job_id) if not update_status: schedule_logger(job.f_job_id).info(f"the number of updates has been exceeded") continue self.schedule_running_job(job=job) except Exception as e: schedule_logger(job.f_job_id).exception(e) schedule_logger(job.f_job_id).error(f"schedule job {job.f_job_id} failed") schedule_logger().info("schedule end status jobs finished")
说明:结束之后的轮询dag_scheduler.py 中,schedule end status job 部分的功能。对于5分钟内(END_STATUS_JOB_SCHEDULING_TIME_LIMIT = 5 60 1000 # ms)已经finish 的job,都还会执行end_scheduling_updates。其中只有第一次能update成功(默认参数是一次 END_STATUS_JOB_SCHEDULING_UPDATES = 1),然后再schedule_running_job 该job。 此后,每次 dag_scheduler轮询(默认每2s),都会报出如下日志: dag_scheduler.py[line:193]: the number of updates has been exceeded
fate_flow/fate_flow_schedule.log
1 2 3
[INFO] [2021-07-26 08:20:48,027] [1:140259369826048] - dag_scheduler.py[line:185]: start schedule end status jobs to update status [INFO] [2021-07-26 08:20:48,036] [1:140259369826048] - dag_scheduler.py[line:187]: have 1 end status jobs [INFO] [2021-07-26 08:20:48,036] [1:140259369826048] - dag_scheduler.py[line:189]: schedule end status job 202107260820309976351