0%
1.6k 字 8 分钟

FATE学习:跟着日志读源码(九)upload任务job finsih阶段

综述

同task,job结束时,并不会返回job finish 的信息,也是在DagScheduler.schedule_running_job() 轮询时,通过calculate_job_status() 方法获取job的状态。
获取job已经finish的信息后,会返还资源和进行相关环境清理。

执行步骤

在这里插入图片描述

  1. dag_scheduler.py:当调度完task后,执行calculate_job_status,计算job当前的状态。执行calculate_job_progress,计算job当前的完成进度。
    在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    [INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - dag_scheduler.py[line:310]: Job 202107260820309976351 status is success, calculate by task status list: ['success']
  2. dag_scheduler.py:根据job 状态和进度变化,同步信息,save model等
    下面这段代码没执行,逻辑上有点问题

    1
    2
    3
    4
    if int(new_progress) - job.f_progress > 0:
    job.f_progress = new_progress
    FederatedScheduler.sync_job(job=job, update_fields=["progress"])
    cls.update_job_on_initiator(initiator_job=job, update_fields=["progress"])

    存疑
    当task状态改变时TaskScheduler.schedule 返回的initiator_tasks_group 中的task状态未改变
    结果导致 DAGScheduler.schedule_running_job 中 total, finished_count = cls.calculate_job_progress(tasks_status=tasks_status)
    无改变,不会触发 后续流程,会多跑一遍。
    建议在task schedule结束之后,再query一遍task的信息,更新状态。

1
2
3
4
5
6
if new_job_status != job.f_status:
job.f_status = new_job_status
if EndStatus.contains(job.f_status):
FederatedScheduler.save_pipelined_model(job=job)
FederatedScheduler.sync_job_status(job=job)
cls.update_job_on_initiator(initiator_job=job, update_fields=["status"])

job 状态变换,且job 已经finish,调用FederatedScheduler.save_pipelined_model
调用链 FederatedScheduler.save_pipelined_model() -> api_utils.federated_coordination_on_http() 这里endpoint 是model-> fate_flow_server通过flask -> party_app.save_pipelined_model() -> JobController.save_pipelined_model
在${job_log_dir}/fate_flow_schedule.log 输出日志

1
2
3
4
[INFO] [2021-07-26 08:20:47,783] [1:140259369826048] - federated_scheduler.py[line:78]: try to save job 202107260820309976351 pipelined model
[INFO] [2021-07-26 08:20:47,791] [1:140259119585024] - job_controller.py[line:300]: job 202107260820309976351 on local 0 start to save pipeline
[INFO] [2021-07-26 08:20:47,800] [1:140259119585024] - job_controller.py[line:340]: job 202107260820309976351 on local 0 save pipeline successfully
[INFO] [2021-07-26 08:20:47,804] [1:140259369826048] - federated_scheduler.py[line:81]: save job 202107260820309976351 pipelined model success

注意:如果是predict,因为使用的是已经存在的model,这里不会再存储
模型存储的路径在
/data/projects/fate/model_local_cache/${role}#${party}#${component_name}#model/${jobid}
/data/projects/fate/model_local_cache/local#0#local-0#model/202107260820309976351

  1. dag_scheduler.py:save完成,同步信息执行FederatedScheduler.sync_job_status(job=job) 调用链FederatedScheduler.sync_job_status() ->-api_utils.federated_coordination_on_http() 这里endpoint 是status -> fate_flow_server通过flask -> party_app.job_status -> JobController.update_job_status -> ResourceManager.return_job_resource
    在update_job_status中,判断job处于finish 状态,回收资源。
    对应在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    2
    3
    4
    5
    [INFO] [2021-07-26 08:20:47,804] [1:140259369826048] - federated_scheduler.py[line:68]: job 202107260820309976351 is success, sync to all party
    [INFO] [2021-07-26 08:20:47,810] [1:140259119585024] - job_saver.py[line:45]: try to update job 202107260820309976351 status to success
    [INFO] [2021-07-26 08:20:47,830] [1:140259119585024] - job_saver.py[line:48]: update job 202107260820309976351 status successfully
    [INFO] [2021-07-26 08:20:47,869] [1:140259119585024] - resource_manager.py[line:175]: return job 202107260820309976351 resource(cores 4 memory 0) on local 0 successfully, remaining cores: 20 remaining memory: 0
    [INFO] [2021-07-26 08:20:47,874] [1:140259369826048] - federated_scheduler.py[line:71]: sync job 202107260820309976351 status success to all party success
  2. dag_scheduler.py:update_job_on_initiator() ,
    调用链 update_job_on_initiator -> JobSaver.update_job_status() -> JobSaver.update_status -> JobSaver.update_entity_table (由于是local,jobstatus 已经更新,前一步失败,这里不会执行)-> JobSaver.update_job() -> JobSaver.update_entity_table
    对应在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    2
    3
    4
    [INFO] [2021-07-26 08:20:47,884] [1:140259369826048] - job_saver.py[line:45]: try to update job 202107260820309976351 status to success
    [INFO] [2021-07-26 08:20:47,894] [1:140259369826048] - job_saver.py[line:56]: update job 202107260820309976351 status does not take effect
    [INFO] [2021-07-26 08:20:47,894] [1:140259369826048] - job_saver.py[line:61]: try to update job 202107260820309976351
    [WARNING] [2021-07-26 08:20:47,904] [1:140259369826048] - job_saver.py[line:66]: job 202107260820309976351 update does not take effect: {'job_id': '202107260820309976351', 'role': 'local', 'party_id': '0', 'status': 'success'}
  3. dag_scheduler.py:执行finish 执行stop_job
    DagScheduler.stop_job() -> FederatedScheduler.stop_job -> api_utils.federated_coordination_on_http() 这里endpoint 是stop -> fate_flow_server通过flask -> party_app.stop_job -> JobController.stop_jobs -> JobController.stop_job ->TaskController.stop_task -> JobController.update_job_status
    注意,因为job是拆解成一个个task执行,故执行进程只有task的,没有job的进程。stop的job的操作是依次遍历job下的task,进行stop_task。在正常流程下,由于所有task都stop之后,才会触发stop_job。故而这里stop_task 都是can not found。
    对应在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [INFO] [2021-07-26 08:20:47,904] [1:140259369826048] - dag_scheduler.py[line:502]: Job 202107260820309976351 finished with success, do something...
    [INFO] [2021-07-26 08:20:47,905] [1:140259369826048] - dag_scheduler.py[line:436]: request stop job 202107260820309976351 with success
    [INFO] [2021-07-26 08:20:47,913] [1:140259369826048] - dag_scheduler.py[line:445]: request stop job 202107260820309976351 with success to all party
    [INFO] [2021-07-26 08:20:47,913] [1:140259369826048] - federated_scheduler.py[line:88]: try to stop job 202107260820309976351
    [INFO] [2021-07-26 08:20:47,937] [1:140259119585024] - job_utils.py[line:396]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
    [INFO] [2021-07-26 08:20:47,937] [1:140259119585024] - job_utils.py[line:399]: can not found job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
    [INFO] [2021-07-26 08:20:47,939] [1:140259119585024] - job_utils.py[line:423]: start run subprocess to stop task session 202107260820309976351_upload_0_0_local_0
    [INFO] [2021-07-26 08:20:47,939] [1:140259119585024] - job_utils.py[line:310]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop
    [INFO] [2021-07-26 08:20:47,953] [1:140259119585024] - job_utils.py[line:333]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop successfully, pid is 153
    [INFO] [2021-07-26 08:20:47,954] [1:140259119585024] - task_controller.py[line:254]: job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 process 90 kill success

    然后调用 JobController.update_job_status 更新信息,不同于3中,这里执行失败,不会进行资源回收。

    1
    2
    [INFO] [2021-07-26 08:20:47,984] [1:140259119585024] - job_saver.py[line:45]: try to update job 202107260820309976351 status to success
    [INFO] [2021-07-26 08:20:47,991] [1:140259119585024] - job_saver.py[line:56]: update job 202107260820309976351 status does not take effect

返回成功信息

1
2
[INFO] [2021-07-26 08:20:47,997] [1:140259369826048] - federated_scheduler.py[line:92]: stop job 202107260820309976351 success
[INFO] [2021-07-26 08:20:47,997] [1:140259369826048] - dag_scheduler.py[line:448]: stop job 202107260820309976351 with success successfully

  1. dag_scheduler.py:执行finish 执行clean_job
    FederatedScheduler.clean_job() -> api_utils.federated_coordination_on_http() 这里endpoint 是clean -> fate_flow_server通过flask -> party_app.stop_job -> JobController.stop_jobs -> JobController.clean_job
    然后clean_job,只打了日志,啥也没干

    1
    2
    3
    4
    def clean_job(cls, job_id, role, party_id, roles):
    schedule_logger(job_id).info('Job {} on {} {} start to clean'.format(job_id, role, party_id))
    # todo
    schedule_logger(job_id).info('job {} on {} {} clean done'.format(job_id, role, party_id))

    对应在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    2
    3
    4
    [INFO] [2021-07-26 08:20:47,997] [1:140259369826048] - federated_scheduler.py[line:107]: try to clean job 202107260820309976351
    [INFO] [2021-07-26 08:20:48,003] [1:140259119585024] - job_controller.py[line:344]: Job 202107260820309976351 on local 0 start to clean
    [INFO] [2021-07-26 08:20:48,003] [1:140259119585024] - job_controller.py[line:346]: job 202107260820309976351 on local 0 clean done
    [INFO] [2021-07-26 08:20:48,007] [1:140259369826048] - federated_scheduler.py[line:110]: clean job 202107260820309976351 success
  2. dag_scheduler.py: finish执行结束,输出日志

    1
    [INFO] [2021-07-26 08:20:48,007] [1:140259369826048] - dag_scheduler.py[line:505]: Job 202107260820309976351 finished with success, done

    至此,job finish 结束,当然,调度器本轮调度也结束了,打出日志

    1
    [INFO] [2021-07-26 08:20:48,007] [1:140259369826048] - dag_scheduler.py[line:325]: finish scheduling job 202107260820309976351