0%
1.7k 字 9 分钟

FATE学习:跟着日志读源码(十)polling

综述

FATE学习:跟着日志读源码(二)fate_flow server 启动介绍过,fate_flow_server.py中的DAGScheduler(interval=2 * 1000).start(),默认每隔2s,启动一次dag_scheduler,共五种调度。
其中:

task的状态保持running不变

若task的执行时间比轮询间隔2s要长,那么在轮询期间,task的状态保持running不变,
对应的${job_log_dir}/fate_flow_schedule.log 日志为

1
2
3
4
5
6
7
8
9
10
[INFO] [2021-07-26 08:20:42,798] [1:140259369826048] - dag_scheduler.py[line:298]: scheduling job 202107260820309976351
[INFO] [2021-07-26 08:20:42,800] [1:140259369826048] - task_scheduler.py[line:28]: scheduling job 202107260820309976351 tasks
[INFO] [2021-07-26 08:20:42,836] [1:140259369826048] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:42,845] [1:140259369826048] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'elapsed': None, 'end_time': None, 'job_id': '202107260820309976351', 'party_id': '0', 'party_status': 'running', 'role': 'local', 'start_time': 1627287632259, 'status': 'running', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'update_time': 1627287631078}
[INFO] [2021-07-26 08:20:42,845] [1:140259369826048] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[WARNING] [2021-07-26 08:20:42,860] [1:140259369826048] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
[INFO] [2021-07-26 08:20:42,867] [1:140259369826048] - task_scheduler.py[line:143]: job 202107260820309976351 task 202107260820309976351_upload_0 0 status is running, calculate by task party status list: ['running']
[INFO] [2021-07-26 08:20:42,867] [1:140259369826048] - task_scheduler.py[line:75]: finish scheduling job 202107260820309976351 tasks
[INFO] [2021-07-26 08:20:42,868] [1:140259369826048] - dag_scheduler.py[line:310]: Job 202107260820309976351 status is running, calculate by task status list: ['running']
[INFO] [2021-07-26 08:20:42,868] [1:140259369826048] - dag_scheduler.py[line:325]: finish scheduling job 202107260820309976351

调度已经为endstatus的job

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
schedule_logger().info("start schedule end status jobs to update status")
jobs = JobSaver.query_job(is_initiator=True, status=set(EndStatus.status_list()), end_time=[current_timestamp() - END_STATUS_JOB_SCHEDULING_TIME_LIMIT, current_timestamp()])
schedule_logger().info(f"have {len(jobs)} end status jobs")
for job in jobs:
schedule_logger().info(f"schedule end status job {job.f_job_id}")
try:
update_status = self.end_scheduling_updates(job_id=job.f_job_id)
if not update_status:
schedule_logger(job.f_job_id).info(f"the number of updates has been exceeded")
continue
self.schedule_running_job(job=job)
except Exception as e:
schedule_logger(job.f_job_id).exception(e)
schedule_logger(job.f_job_id).error(f"schedule job {job.f_job_id} failed")
schedule_logger().info("schedule end status jobs finished")

说明:结束之后的轮询dag_scheduler.py 中,schedule end status job 部分的功能。对于5分钟内(END_STATUS_JOB_SCHEDULING_TIME_LIMIT = 5 60 1000 # ms)已经finish 的job,都还会执行end_scheduling_updates。其中只有第一次能update成功(默认参数是一次 END_STATUS_JOB_SCHEDULING_UPDATES = 1),然后再schedule_running_job 该job。
此后,每次 dag_scheduler轮询(默认每2s),都会报出如下日志:
dag_scheduler.py[line:193]: the number of updates has been exceeded

fate_flow/fate_flow_schedule.log

1
2
3
[INFO] [2021-07-26 08:20:48,027] [1:140259369826048] - dag_scheduler.py[line:185]: start schedule end status jobs to update status
[INFO] [2021-07-26 08:20:48,036] [1:140259369826048] - dag_scheduler.py[line:187]: have 1 end status jobs
[INFO] [2021-07-26 08:20:48,036] [1:140259369826048] - dag_scheduler.py[line:189]: schedule end status job 202107260820309976351

再一次调度schedule_running_job

${job_log_dir}/fate_flow_schedule.log 日志为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[INFO] [2021-07-26 08:20:48,044] [1:140259369826048] - dag_scheduler.py[line:298]: scheduling job 202107260820309976351
[INFO] [2021-07-26 08:20:48,046] [1:140259369826048] - task_scheduler.py[line:28]: scheduling job 202107260820309976351 tasks
[INFO] [2021-07-26 08:20:48,066] [1:140259369826048] - task_scheduler.py[line:143]: job 202107260820309976351 task 202107260820309976351_upload_0 0 status is success, calculate by task party status list: ['success']
[INFO] [2021-07-26 08:20:48,067] [1:140259369826048] - task_scheduler.py[line:75]: finish scheduling job 202107260820309976351 tasks
[INFO] [2021-07-26 08:20:48,067] [1:140259369826048] - dag_scheduler.py[line:310]: Job 202107260820309976351 status is success, calculate by task status list: ['success']
[INFO] [2021-07-26 08:20:48,067] [1:140259369826048] - dag_scheduler.py[line:502]: Job 202107260820309976351 finished with success, do something...
[INFO] [2021-07-26 08:20:48,067] [1:140259369826048] - dag_scheduler.py[line:436]: request stop job 202107260820309976351 with success
[INFO] [2021-07-26 08:20:48,074] [1:140259369826048] - dag_scheduler.py[line:445]: request stop job 202107260820309976351 with success to all party
[INFO] [2021-07-26 08:20:48,074] [1:140259369826048] - federated_scheduler.py[line:88]: try to stop job 202107260820309976351
[INFO] [2021-07-26 08:20:48,097] [1:140259119585024] - job_utils.py[line:396]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:48,097] [1:140259119585024] - job_utils.py[line:399]: can not found job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:48,100] [1:140259119585024] - job_utils.py[line:423]: start run subprocess to stop task session 202107260820309976351_upload_0_0_local_0
[INFO] [2021-07-26 08:20:48,100] [1:140259119585024] - job_utils.py[line:310]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop
[INFO] [2021-07-26 08:20:48,111] [1:140259119585024] - job_utils.py[line:333]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop successfully, pid is 159
[INFO] [2021-07-26 08:20:48,111] [1:140259119585024] - task_controller.py[line:254]: job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 process 90 kill success
[INFO] [2021-07-26 08:20:48,112] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:48,119] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'role': 'local', 'party_id': '0', 'party_status': 'success'}
[INFO] [2021-07-26 08:20:48,144] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[WARNING] [2021-07-26 08:20:48,147] [1:140259119585024] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
[INFO] [2021-07-26 08:20:48,151] [1:140259119585024] - job_saver.py[line:45]: try to update job 202107260820309976351 status to success
[INFO] [2021-07-26 08:20:48,156] [1:140259119585024] - job_saver.py[line:56]: update job 202107260820309976351 status does not take effect
[INFO] [2021-07-26 08:20:48,158] [1:140259369826048] - federated_scheduler.py[line:92]: stop job 202107260820309976351 success
[INFO] [2021-07-26 08:20:48,158] [1:140259369826048] - dag_scheduler.py[line:448]: stop job 202107260820309976351 with success successfully
[INFO] [2021-07-26 08:20:48,158] [1:140259369826048] - federated_scheduler.py[line:107]: try to clean job 202107260820309976351
[INFO] [2021-07-26 08:20:48,162] [1:140259119585024] - job_controller.py[line:344]: Job 202107260820309976351 on local 0 start to clean
[INFO] [2021-07-26 08:20:48,162] [1:140259119585024] - job_controller.py[line:346]: job 202107260820309976351 on local 0 clean done
[INFO] [2021-07-26 08:20:48,164] [1:140259369826048] - federated_scheduler.py[line:110]: clean job 202107260820309976351 success
[INFO] [2021-07-26 08:20:48,164] [1:140259369826048] - dag_scheduler.py[line:505]: Job 202107260820309976351 finished with success, done
[INFO] [2021-07-26 08:20:48,164] [1:140259369826048] - dag_scheduler.py[line:325]: finish scheduling job 202107260820309976351
[INFO] [2021-07-26 08:20:48,596] [148:140090979002176] - session_utils.py[line:38]: start stop session 202107260820309976351_upload_0_0_local_0
[INFO] [2021-07-26 08:20:48,762] [153:140325590579008] - session_utils.py[line:38]: start stop session 202107260820309976351_upload_0_0_local_0
[INFO] [2021-07-26 08:20:48,941] [159:140675688425280] - session_utils.py[line:38]: start stop session 202107260820309976351_upload_0_0_local_0
[INFO] [2021-07-26 08:20:50,205] [1:140259369826048] - dag_scheduler.py[line:193]: the number of updates has been exceeded
[INFO] [2021-07-26 08:20:50,227] [153:140325590579008] - session_utils.py[line:47]: stop session 202107260820309976351_upload_0_0_local_0 success
[INFO] [2021-07-26 08:20:50,233] [148:140090979002176] - session_utils.py[line:47]: stop session 202107260820309976351_upload_0_0_local_0 success
[INFO] [2021-07-26 08:20:50,240] [159:140675688425280] - session_utils.py[line:47]: stop session 202107260820309976351_upload_0_0_local_0 success

再次调度产生的日志

1
[INFO] [2021-07-26 08:20:52,248] [1:140259369826048] - dag_scheduler.py[line:193]: the number of updates has been exceeded