0%
2.7k 字 14 分钟

FATE学习:跟着日志读源码(五)upload任务job schedule阶段

综述

由于是异步提交的,job启动是由server 进行轮询,对于处于waiting的 job 按照FIFO进行调度。
调度之后的操作有2部分:

  • Part1. 申请资源
  • Part2. start job:将job的状态从waiting -> running 状态

涉及的主要方法: dag_scheduler.schedule_waiting_jobs()

执行细节

在这里插入图片描述

  1. dag_scheduler.py:轮询,发现处于waiting状态的job(这一部分见fate flow server 启动部分),调用schedule_waiting_jobs 调度处于waiting 状态的job。
    轮询日志输出在 fate_flow/fate_flow_schedule.log 中
    1
    2
    3
    4
    [INFO] [2021-07-26 08:20:31,994] [1:140259369826048] - dag_scheduler.py[line:134]: start schedule waiting jobs
    [INFO] [2021-07-26 08:20:32,007] [1:140259369826048] - dag_scheduler.py[line:136]: have 1 waiting jobs
    [INFO] [2021-07-26 08:20:32,008] [1:140259369826048] - dag_scheduler.py[line:140]: schedule waiting job 202107260820309976351
    [INFO] [2021-07-26 08:20:32,117] [1:140259369826048] - dag_scheduler.py[line:146]: schedule waiting jobs finished
    对应查询waiting状态的fate_flow/peewee.log为
    1
    [DEBUG] [2021-07-26 08:20:32,001] [1:140259369826048] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_user_id`, `t1`.`f_job_id`, `t1`.`f_name`, `t1`.`f_description`, `t1`.`f_tag`, `t1`.`f_dsl`, `t1`.`f_runtime_conf`, `t1`.`f_runtime_conf_on_party`, `t1`.`f_train_runtime_conf`, `t1`.`f_roles`, `t1`.`f_work_mode`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_is_initiator`, `t1`.`f_progress`, `t1`.`f_ready_signal`, `t1`.`f_ready_time`, `t1`.`f_cancel_signal`, `t1`.`f_cancel_time`, `t1`.`f_rerun_signal`, `t1`.`f_end_scheduling_updates`, `t1`.`f_engine_name`, `t1`.`f_engine_type`, `t1`.`f_cores`, `t1`.`f_memory`, `t1`.`f_remaining_cores`, `t1`.`f_remaining_memory`, `t1`.`f_resource_in_use`, `t1`.`f_apply_resource_time`, `t1`.`f_return_resource_time`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_job` AS `t1` WHERE ((`t1`.`f_is_initiator` = %s) AND (`t1`.`f_status` = %s)) ORDER BY `t1`.`f_create_time` ASC', [True, 'waiting'])
  2. dag_scheduler.py: 将job ready_signal 置为true后,调用FederatedScheduler.resource_for_job() 申请资源
    ready_signal 部分的代码为
    1
    if not cls.ready_signal(job_id=job_id, set_or_reset=True):
    会在fate_flow/peewee.log 中输出
    1
    [DEBUG] [2021-07-26 08:20:32,009] [1:140259369826048] - peewee.py[line:2863]: ('UPDATE `t_job` SET `f_ready_signal` = %s, `f_ready_time` = %s WHERE ((`t_job`.`f_job_id` = %s) AND (`t_job`.`f_ready_signal` = %s))', [True, 1627287632009, '202107260820309976351', False])
  3. federated_scheduler.py:开始申请资源,这里会在${job_log_dir}/fate_flow_schedule.log 打出日志,
    1
    [INFO] [2021-07-26 08:20:32,022] [1:140259369826048] - federated_scheduler.py[line:39]: try to apply job 202107260820309976351 resource
    然后根据入参,调用job_command发起请求,申请资源。
  4. api_utils.py:如前文所述,本地的是http请求,故而调用的是ederated_coordination_on_http。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    url = "http://{}:{}{}".format(host, port, endpoint)
    audit_logger(job_id).info('remote http api request: {}'.format(url))
    action = getattr(requests, method.lower(), None)
    headers = HEADERS.copy()
    headers["dest-party-id"] = str(dest_party_id)
    headers["src-party-id"] = str(src_party_id)
    headers["src-role"] = str(src_role)
    http_response = action(url=url, data=json_dumps(json_body), headers=headers)
    audit_logger(job_id).info(http_response.text)
    response = http_response.json()
    audit_logger(job_id).info('remote http api response: {} {}'.format(endpoint, response))
    return response

    先生成请求的url。并在${job_log_dir}/fate_flow_audit.log 中打印出

    1
    [INFO] [2021-07-26 08:20:32,023] [1:140259369826048] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/party/202107260820309976351/local/0/resource/apply
  5. fate_flow_server.py:接收请求,因为endpoint 是party/**,通过flask机制,跳转至party_app

  6. party_app.py:调用 ResourceManager.apply_for_job_resource() 申请资源
  7. resource_manager.py:在resource_for_job()给job分配资源。主要操作为
    ->调用calculate_job_resource,查询表t_job,得到f_runtime_conf_on_party进一步计算得到engine_name,cores,memory
    这里会在fate_flow/peewee.log
    1
    [DEBUG] [2021-07-26 08:20:32,031] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_runtime_conf_on_party` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])
    ->获取相关信息后,更新t_job表
    fate_flow/peewee.log 中有
    1
    [DEBUG] [2021-07-26 08:20:32,037] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_job` SET `f_engine_name` = %s, `f_engine_type` = %s, `f_cores` = %s, `f_memory` = %s, `f_remaining_cores` = %s, `f_remaining_memory` = %s, `f_resource_in_use` = %s, `f_apply_resource_time` = %s WHERE ((((`t_job`.`f_job_id` = %s) AND (`t_job`.`f_role` = %s)) AND (`t_job`.`f_party_id` = %s)) AND (`t_job`.`f_resource_in_use` = %s))', ['EGGROLL', 'computing', 4, 0, 4, 0, True, 1627287632035, '202107260820309976351', 'local', '0', False])
    -> t_job表更新成功后,再更新t_engine_registry表
    代码
    1
    2
    operate = EngineRegistry.update(updates).where(*filters)
    apply_status = operate.execute() > 0
    fate_flow/peewee.log
    1
    [DEBUG] [2021-07-26 08:20:32,040] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_engine_registry` SET `f_remaining_cores` = (`t_engine_registry`.`f_remaining_cores` - %s), `f_remaining_memory` = (`t_engine_registry`.`f_remaining_memory` - %s) WHERE ((((`t_engine_registry`.`f_remaining_cores` >= %s) AND (`t_engine_registry`.`f_remaining_memory` >= %s)) AND (`t_engine_registry`.`f_engine_type` = %s)) AND (`t_engine_registry`.`f_engine_name` = %s))', [4, 0, 4, 0, 'computing', 'EGGROLL'])
    获取当前剩余的资源
    1
    2
    3
    4
    5
    remaining_cores, remaining_memory = cls.get_remaining_resource(EngineRegistry,
    [
    EngineRegistry.f_engine_type == EngineType.COMPUTING,
    EngineRegistry.f_engine_name == engine_name])

    对应fate_flow/peewee.log
    1
    [DEBUG] [2021-07-26 08:20:32,046] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_remaining_cores`, `t1`.`f_remaining_memory` FROM `t_engine_registry` AS `t1` WHERE ((`t1`.`f_engine_type` = %s) AND (`t1`.`f_engine_name` = %s))', ['computing', 'EGGROLL'])
    若查到数据且一切正常,在${job_log_dir}/fate_flow_schedule.log中打出如下日志
    1
    [INFO] [2021-07-26 08:20:32,049] [1:140259119585024] - resource_manager.py[line:175]: apply job 202107260820309976351 resource(cores 4 memory 0) on local 0 successfully, remaining cores: 16 remaining memory: 0
  8. resource_manager.py:返回资源申请结果
  9. party_app.py: 返回response给fate_flow_server.py,对应5
  10. fate_flow_server.py:返回response给api_utils.py,对应4
    参考 4中的代码,这里会在fate_flow/fate_flow_audit.log中输出如下日志
    1
    2
    [INFO] [2021-07-26 08:20:32,052] [1:140259369826048] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}
    [INFO] [2021-07-26 08:20:32,052] [1:140259369826048] - api_utils.py[line:131]: remote http api response: /v1/party/202107260820309976351/local/0/resource/apply {'retcode': 0, 'retmsg': 'success'}
  11. api_utils.py:返回response 给federated_scheduler.py,对应3
  12. federated_scheduler.py: 根据response 结果输出日志到 ${job_log_dir}/fate_flow_schedule.log 中
    1
    [INFO] [2021-07-26 08:20:32,053] [1:140259369826048] - federated_scheduler.py[line:42]: apply job 202107260820309976351 resource successfully
    并返回资源分配结果给dag_scheduler.py,对应2
  13. dag_scheduler.py:如资源申请成功,调用start_job启动job,
    在${job_log_dir}/fate_flow_schedule.log 中输出日志
    1
    [INFO] [2021-07-26 08:20:32,053] [1:140259369826048] - dag_scheduler.py[line:279]: try to start job 202107260820309976351 on initiator local 0
    然后配置job_info 各项参数将status置为Runing,将tag置为end_waiting
    在容器日志中也有
    1
    10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/resource/apply HTTP/1.1" 200 -
  14. dag_scheduler.py: 调用JobSaver.query_job查询db,确认该job在数据库中,就继续执行(骚操作,删db)
    对应fate_flow/peewee.log 日志
    1
    [DEBUG] [2021-07-26 08:20:32,058] [1:140259369826048] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_user_id`, `t1`.`f_job_id`, `t1`.`f_name`, `t1`.`f_description`, `t1`.`f_tag`, `t1`.`f_dsl`, `t1`.`f_runtime_conf`, `t1`.`f_runtime_conf_on_party`, `t1`.`f_train_runtime_conf`, `t1`.`f_roles`, `t1`.`f_work_mode`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_is_initiator`, `t1`.`f_progress`, `t1`.`f_ready_signal`, `t1`.`f_ready_time`, `t1`.`f_cancel_signal`, `t1`.`f_cancel_time`, `t1`.`f_rerun_signal`, `t1`.`f_end_scheduling_updates`, `t1`.`f_engine_name`, `t1`.`f_engine_type`, `t1`.`f_cores`, `t1`.`f_memory`, `t1`.`f_remaining_cores`, `t1`.`f_remaining_memory`, `t1`.`f_resource_in_use`, `t1`.`f_apply_resource_time`, `t1`.`f_return_resource_time`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])
    这里对应的是submit阶段的insert语句。
  15. dag_scheduler.py:调用FederatedScheduler.start_job() 启动任务
  16. federated_scheduler.py:调用job_command start job
  17. federated_scheduler.py:由于是本地,向server发起http请求
    类似第4步,在${job_log_dir}/fate_flow_audit.log中输出日志
    1
    [INFO] [2021-07-26 08:20:32,063] [1:140259369826048] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/party/202107260820309976351/local/0/start
  18. fate_flow_server.py:接收请求,因为endpoint 是party/**,通过flask机制,跳转至party_app通过Flask 机制,跳转至party_app
  19. party_app.py: 调用JobController.start_job() 启动job
  20. job_controller.py:打印日志
    ${job_log_dir}/fate_flow_schedule.log
    1
    [INFO] [2021-07-26 08:20:32,053] [1:140259369826048] - dag_scheduler.py[line:279]: try to start job 202107260820309976351 on initiator local 0
    并设置job_info,主要是将status 置为running,调用update_job_status 更新DB
  21. job_controller.py:调用JobSaver.update_job_status
  22. job_saver.py:执行update_job_status()
    首先输出日志到${job_log_dir}/fate_flow_schedule.log
    1
    [INFO] [2021-07-26 08:20:32,070] [1:140259119585024] - job_saver.py[line:45]: try to update job 202107260820309976351 status to running
    调用update_status(),在update_status()内部,先查询db
    1
    [DEBUG] [2021-07-26 08:20:32,074] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_user_id`, `t1`.`f_job_id`, `t1`.`f_name`, `t1`.`f_description`, `t1`.`f_tag`, `t1`.`f_dsl`, `t1`.`f_runtime_conf`, `t1`.`f_runtime_conf_on_party`, `t1`.`f_train_runtime_conf`, `t1`.`f_roles`, `t1`.`f_work_mode`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_is_initiator`, `t1`.`f_progress`, `t1`.`f_ready_signal`, `t1`.`f_ready_time`, `t1`.`f_cancel_signal`, `t1`.`f_cancel_time`, `t1`.`f_rerun_signal`, `t1`.`f_end_scheduling_updates`, `t1`.`f_engine_name`, `t1`.`f_engine_type`, `t1`.`f_cores`, `t1`.`f_memory`, `t1`.`f_remaining_cores`, `t1`.`f_remaining_memory`, `t1`.`f_resource_in_use`, `t1`.`f_apply_resource_time`, `t1`.`f_return_resource_time`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])
    根据获取信息,更新相关信息,然后调用execute_update(),执行sql更新DB
    注意,在execute_update()这个方法里,会输出日志到${job_log_dir}/fate_flow_sql.log
    1
    [INFO] [2021-07-26 08:20:32,081] [1:140259119585024] - job_saver.py[line:185]: UPDATE `t_job` SET `f_status` = 'running' WHERE ((((`t_job`.`f_job_id` = '202107260820309976351') AND (`t_job`.`f_role` = 'local')) AND (`t_job`.`f_party_id` = '0')) AND (`t_job`.`f_status` = 'waiting'))
    对应的fate_flow/peewee.log 日志为
    1
    [DEBUG] [2021-07-26 08:20:32,083] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_job` SET `f_status` = %s WHERE ((((`t_job`.`f_job_id` = %s) AND (`t_job`.`f_role` = %s)) AND (`t_job`.`f_party_id` = %s)) AND (`t_job`.`f_status` = %s))', ['running', '202107260820309976351', 'local', '0', 'waiting'])
    可以看到,就是将状态从waiting 更新为running。从时间戳上也能看出,先打sql.log,再底层调用peewee执行
    update_status() 执行完毕,DB更新成功,会在${job_log_dir}/fate_flow_schedule.log中输出日志
    1
    [INFO] [2021-07-26 08:20:32,088] [1:140259119585024] - job_saver.py[line:48]: update job 202107260820309976351 status successfully

注:若job的status属于EndStatus,会调用update_entity_table()

  1. job_saver.py:返回update_status()的执行状态给job_controllle.py 对应21
  2. job_controlller.py:返回update_job_status()执行状态
    注:若状态为true,会根据job status 判断是否回收资源,这个在start 阶段不用,但是后续finish 阶段会用

  3. job_controlller.py: 调用update_job

  4. job_controlller.py:调用JobSaver.update_job
  5. job_saver.py:执行update_job()
    输出日志到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,089] [1:140259119585024] - job_saver.py[line:61]: try to update job 202107260820309976351

    调用update_entity_table,同22部分,先根据jobid 查现有更新,然后执行update

    1
    2
    [DEBUG] [2021-07-26 08:20:32,092] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_user_id`, `t1`.`f_job_id`, `t1`.`f_name`, `t1`.`f_description`, `t1`.`f_tag`, `t1`.`f_dsl`, `t1`.`f_runtime_conf`, `t1`.`f_runtime_conf_on_party`, `t1`.`f_train_runtime_conf`, `t1`.`f_roles`, `t1`.`f_work_mode`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_is_initiator`, `t1`.`f_progress`, `t1`.`f_ready_signal`, `t1`.`f_ready_time`, `t1`.`f_cancel_signal`, `t1`.`f_cancel_time`, `t1`.`f_rerun_signal`, `t1`.`f_end_scheduling_updates`, `t1`.`f_engine_name`, `t1`.`f_engine_type`, `t1`.`f_cores`, `t1`.`f_memory`, `t1`.`f_remaining_cores`, `t1`.`f_remaining_memory`, `t1`.`f_resource_in_use`, `t1`.`f_apply_resource_time`, `t1`.`f_return_resource_time`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])
    [DEBUG] [2021-07-26 08:20:32,099] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_job` SET `f_start_time` = %s, `f_start_date` = %s WHERE (((`t_job`.`f_job_id` = %s) AND (`t_job`.`f_role` = %s)) AND (`t_job`.`f_party_id` = %s))', [1627287632070, datetime.datetime(2021, 7, 26, 8, 20, 32), '202107260820309976351', 'local', '0'])

    对应的${job_log_dir}/fate_flow_sql.log日志

    1
    [INFO] [2021-07-26 08:20:32,097] [1:140259119585024] - job_saver.py[line:185]: UPDATE `t_job` SET `f_start_time` = 1627287632070, `f_start_date` = '2021-07-26 08:20:32' WHERE (((`t_job`.`f_job_id` = '202107260820309976351') AND (`t_job`.`f_role` = 'local')) AND (`t_job`.`f_party_id` = '0'))
  6. job_saver.py:输出日志到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,106] [1:140259119585024] - job_saver.py[line:64]: job 202107260820309976351 update successfully: {'job_id': '202107260820309976351', 'role': 'local', 'party_id': 0, 'status': 'running', 'start_time': 1627287632070}

    并返回update_status()的执行状态给job_controllle.py 对应26

  7. job_controlller.py:start 结束, 输出 successfully 到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,106] [1:140259119585024] - job_controller.py[line:250]: start job 202107260820309976351 on local 0 successfully
  8. dag_scheduler.py:若上述都无异常,在${job_log_dir}/fate_flow_schedule.log输出如下日志(对应13)

    1
    [INFO] [2021-07-26 08:20:32,110] [1:140259369826048] - dag_scheduler.py[line:292]: start job 202107260820309976351 on initiator local 0
  9. dag_scheduler.py:调用ready_singal,执行Job.updates,将ready_signal置为false
    对应的fate_flow/peewee.log

    1
    [DEBUG] [2021-07-26 08:20:32,111] [1:140259369826048] - peewee.py[line:2863]: ('UPDATE `t_job` SET `f_ready_signal` = %s, `f_ready_time` = %s WHERE ((`t_job`.`f_job_id` = %s) AND (`t_job`.`f_ready_signal` = %s))', [False, None, '202107260820309976351', True])
  10. dag_scheduler.py:在${job_log_dir}/fate_flow_schedule.log中打印31的执行结果
    1
    [INFO] [2021-07-26 08:20:32,117] [1:140259369826048] - dag_scheduler.py[line:247]: reset job 202107260820309976351 ready signal True

至此,执行完毕,主要操作为更新DB中job的状态和resource的状态