综述
task结束时,是通过TaskExecutor.report_task_update_to_driver更新本地的DB中task的状态信息的。但由于是异步请求,对于发起者而言,并不会收到task 结束的信息,只有在轮询中,去查询db,获取task的状态。
获取task已经finish的信息后,会返还资源和进行相关环境清理。
执行细节
dag_scheduler.py:schedule_running_job 调度轮询,和前文类似,这里不多赘述调用链为
DagScheduler.schedule_running_job() ->TaskScheduler.schedule->JobSaver.get_tasks_asc -> JobSaver.collect_task_of_all_party -> JobSaver.federated_task_status
对应的${job_log_dir}/fate_flow_schedule.log 日志为1
2
3
4
5
6
7[INFO] [2021-07-26 08:20:44,944] [1:140259369826048] - dag_scheduler.py[line:298]: scheduling job 202107260820309976351
[INFO] [2021-07-26 08:20:44,945] [1:140259369826048] - task_scheduler.py[line:28]: scheduling job 202107260820309976351 tasks
[INFO] [2021-07-26 08:20:45,010] [1:140259369826048] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:45,022] [1:140259369826048] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'elapsed': 11447, 'end_time': 1627287644838, 'job_id': '202107260820309976351', 'party_id': '0', 'party_status': 'success', 'role': 'local', 'start_time': 1627287632259, 'status': 'running', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'update_time': 1627287631078}
[INFO] [2021-07-26 08:20:45,022] [1:140259369826048] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[WARNING] [2021-07-26 08:20:45,036] [1:140259369826048] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
[INFO] [2021-07-26 08:20:45,041] [1:140259369826048] - task_scheduler.py[line:143]: job 202107260820309976351 task 202107260820309976351_upload_0 0 status is success, calculate by task party status list: ['success']task_scheduler.py:同步状态,同上文相同,调用FederatedScheduler.sync_task_status
产生的调用链为
FederatedScheduler.sync_task_status() -> api_utils.federated_coordination_on_http() -> fate_flow_server通过flask -> party_app.task_status() -> TaskController.update_task_status
对应的${job_log_dir}/fate_flow_schedule.log 日志为1
2
3[INFO] [2021-07-26 08:20:45,041] [1:140259369826048] - federated_scheduler.py[line:192]: job 202107260820309976351 task 202107260820309976351_upload_0 0 is success, sync to all party
[INFO] [2021-07-26 08:20:45,053] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:45,063] [1:140259119585024] - job_saver.py[line:74]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status successfully: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'status': 'success'}
- resource_manager.py:归还资源,执行return_task_resource 这一部分会更新resource 相关的DB,增加剩余资源数。
对应的${job_log_dir}/fate_flow_schedule.log 日志为
1 | [INFO] [2021-07-26 08:20:45,070] [1:140259119585024] - resource_manager.py[line:285]: task 202107260820309976351_upload_0 0 return resource successfully |
task_controller.py:执行clean_task,调用 job_tracker.clean_task() job_tracker.clean_task 清理tracker 表中的相关内容
对应的${job_log_dir}/fate_flow_schedule.log 日志为1
2
3[INFO] [2021-07-26 08:20:45,075] [1:140259119585024] - job_tracker.py[line:424]: clean task 202107260820309976351_upload_0 0 on local 0
[INFO] [2021-07-26 08:20:46,706] [1:140259119585024] - job_tracker.py[line:440]: clean table by namespace 202107260820309976351_upload_0_0_local_0 on local 0 done
[INFO] [2021-07-26 08:20:46,725] [1:140259119585024] - job_tracker.py[line:446]: clean table by namespace 202107260820309976351_upload_0_0 on local 0 donefederated_scheduler.py: 返回执行结果,对应2
对应的${job_log_dir}/fate_flow_schedule.log 日志为1
[INFO] [2021-07-26 08:20:47,714] [1:140259369826048] - federated_scheduler.py[line:195]: sync job 202107260820309976351 task 202107260820309976351_upload_0 0 status success to all party success
- task_scheduler.py:因task 的状态已经属于EndStatus,执行FederatedScheduler.stop_task 根据pid 进行kill。
调用链为FederatedScheduler.stop_task()-> api_utils.federated_coordination_on_http() -> fate_flow_server通过flask -> party_app.stop_task() -> TaskController.stop_task() -> TaskController.kill_task() ->job_utils.kill_task_executor_process
P1: kill pid
P2: stop session
对应的${job_log_dir}/fate_flow_schedule.log 日志为1
2
3
4
5
6
7[INFO] [2021-07-26 08:20:47,714] [1:140259369826048] - federated_scheduler.py[line:202]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 0
[INFO] [2021-07-26 08:20:47,739] [1:140259119585024] - job_utils.py[line:396]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:47,740] [1:140259119585024] - job_utils.py[line:399]: can not found job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:47,742] [1:140259119585024] - job_utils.py[line:423]: start run subprocess to stop task session 202107260820309976351_upload_0_0_local_0
[INFO] [2021-07-26 08:20:47,743] [1:140259119585024] - job_utils.py[line:310]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop
[INFO] [2021-07-26 08:20:47,755] [1:140259119585024] - job_utils.py[line:333]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop successfully, pid is 148
[INFO] [2021-07-26 08:20:47,756] [1:140259119585024] - task_controller.py[line:254]: job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 process 90 kill success
task_controller.py:更新信息
cls.update_task_status(task_info=task_info)cls.update_task(task_info=task_info)
1
2
3
4[INFO] [2021-07-26 08:20:47,756] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:47,762] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'role': 'local', 'party_id': '0', 'party_status': 'success'}
[INFO] [2021-07-26 08:20:47,767] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[WARNING] [2021-07-26 08:20:47,771] [1:140259119585024] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effectfederated_scheduler.py: 对应6
1
[INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - federated_scheduler.py[line:206]: stop job 202107260820309976351 task 202107260820309976351_upload_0 0 success
task_scheduler.py:对应1
1
[INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - task_scheduler.py[line:75]: finish scheduling job 202107260820309976351 tasks
task finish 结束