0%
1.2k 字 6 分钟

FATE学习:跟着日志读源码(八)upload任务task finsih阶段

综述

task结束时,是通过TaskExecutor.report_task_update_to_driver更新本地的DB中task的状态信息的。但由于是异步请求,对于发起者而言,并不会收到task 结束的信息,只有在轮询中,去查询db,获取task的状态。
获取task已经finish的信息后,会返还资源和进行相关环境清理。

执行细节

在这里插入图片描述

  1. dag_scheduler.py:schedule_running_job 调度轮询,和前文类似,这里不多赘述调用链为
    DagScheduler.schedule_running_job() ->TaskScheduler.schedule->JobSaver.get_tasks_asc -> JobSaver.collect_task_of_all_party -> JobSaver.federated_task_status
    对应的${job_log_dir}/fate_flow_schedule.log 日志为

    1
    2
    3
    4
    5
    6
    7
    [INFO] [2021-07-26 08:20:44,944] [1:140259369826048] - dag_scheduler.py[line:298]: scheduling job 202107260820309976351
    [INFO] [2021-07-26 08:20:44,945] [1:140259369826048] - task_scheduler.py[line:28]: scheduling job 202107260820309976351 tasks
    [INFO] [2021-07-26 08:20:45,010] [1:140259369826048] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
    [INFO] [2021-07-26 08:20:45,022] [1:140259369826048] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'elapsed': 11447, 'end_time': 1627287644838, 'job_id': '202107260820309976351', 'party_id': '0', 'party_status': 'success', 'role': 'local', 'start_time': 1627287632259, 'status': 'running', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'update_time': 1627287631078}
    [INFO] [2021-07-26 08:20:45,022] [1:140259369826048] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
    [WARNING] [2021-07-26 08:20:45,036] [1:140259369826048] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
    [INFO] [2021-07-26 08:20:45,041] [1:140259369826048] - task_scheduler.py[line:143]: job 202107260820309976351 task 202107260820309976351_upload_0 0 status is success, calculate by task party status list: ['success']
  2. task_scheduler.py:同步状态,同上文相同,调用FederatedScheduler.sync_task_status
    产生的调用链为
    FederatedScheduler.sync_task_status() -> api_utils.federated_coordination_on_http() -> fate_flow_server通过flask -> party_app.task_status() -> TaskController.update_task_status

对应的${job_log_dir}/fate_flow_schedule.log 日志为

1
2
3
[INFO] [2021-07-26 08:20:45,041] [1:140259369826048] - federated_scheduler.py[line:192]: job 202107260820309976351 task 202107260820309976351_upload_0 0 is success, sync to all party
[INFO] [2021-07-26 08:20:45,053] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:45,063] [1:140259119585024] - job_saver.py[line:74]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status successfully: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'status': 'success'}

  1. resource_manager.py:归还资源,执行return_task_resource 这一部分会更新resource 相关的DB,增加剩余资源数。
    对应的${job_log_dir}/fate_flow_schedule.log 日志为
1
[INFO] [2021-07-26 08:20:45,070] [1:140259119585024] - resource_manager.py[line:285]: task 202107260820309976351_upload_0 0 return resource successfully
  1. task_controller.py:执行clean_task,调用 job_tracker.clean_task() job_tracker.clean_task 清理tracker 表中的相关内容
    对应的${job_log_dir}/fate_flow_schedule.log 日志为

    1
    2
    3
    [INFO] [2021-07-26 08:20:45,075] [1:140259119585024] - job_tracker.py[line:424]: clean task 202107260820309976351_upload_0 0 on local 0
    [INFO] [2021-07-26 08:20:46,706] [1:140259119585024] - job_tracker.py[line:440]: clean table by namespace 202107260820309976351_upload_0_0_local_0 on local 0 done
    [INFO] [2021-07-26 08:20:46,725] [1:140259119585024] - job_tracker.py[line:446]: clean table by namespace 202107260820309976351_upload_0_0 on local 0 done
  2. federated_scheduler.py: 返回执行结果,对应2
    对应的${job_log_dir}/fate_flow_schedule.log 日志为

    1
    [INFO] [2021-07-26 08:20:47,714] [1:140259369826048] - federated_scheduler.py[line:195]: sync job 202107260820309976351 task 202107260820309976351_upload_0 0 status success to all party success
  3. task_scheduler.py:因task 的状态已经属于EndStatus,执行FederatedScheduler.stop_task 根据pid 进行kill。
    调用链为FederatedScheduler.stop_task()-> api_utils.federated_coordination_on_http() -> fate_flow_server通过flask -> party_app.stop_task() -> TaskController.stop_task() -> TaskController.kill_task() ->job_utils.kill_task_executor_process
    P1: kill pid
    P2: stop session

对应的${job_log_dir}/fate_flow_schedule.log 日志为

1
2
3
4
5
6
7
[INFO] [2021-07-26 08:20:47,714] [1:140259369826048] - federated_scheduler.py[line:202]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 0
[INFO] [2021-07-26 08:20:47,739] [1:140259119585024] - job_utils.py[line:396]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:47,740] [1:140259119585024] - job_utils.py[line:399]: can not found job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:47,742] [1:140259119585024] - job_utils.py[line:423]: start run subprocess to stop task session 202107260820309976351_upload_0_0_local_0
[INFO] [2021-07-26 08:20:47,743] [1:140259119585024] - job_utils.py[line:310]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop
[INFO] [2021-07-26 08:20:47,755] [1:140259119585024] - job_utils.py[line:333]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop successfully, pid is 148
[INFO] [2021-07-26 08:20:47,756] [1:140259119585024] - task_controller.py[line:254]: job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 process 90 kill success

  1. task_controller.py:更新信息
    cls.update_task_status(task_info=task_info)

    cls.update_task(task_info=task_info)
    
    1
    2
    3
    4
    [INFO] [2021-07-26 08:20:47,756] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
    [INFO] [2021-07-26 08:20:47,762] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'role': 'local', 'party_id': '0', 'party_status': 'success'}
    [INFO] [2021-07-26 08:20:47,767] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
    [WARNING] [2021-07-26 08:20:47,771] [1:140259119585024] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
  2. federated_scheduler.py: 对应6

    1
    [INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - federated_scheduler.py[line:206]: stop job 202107260820309976351 task 202107260820309976351_upload_0 0 success
  3. task_scheduler.py:对应1

    1
    [INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - task_scheduler.py[line:75]: finish scheduling job 202107260820309976351 tasks

task finish 结束