0%
5.4k 字 28 分钟

FATE学习:跟着日志读源码(六)upload任务task schedule阶段

综述

在前文的基础上,因为job已经处于running状态,会按照task的依赖关系,依次调度该job下的task。

调度之后的操作有2部分:

  • Part1. 申请资源
  • Part2. start job:将job的状态从waiting -> running 状态
    涉及的主要方法: dag_scheduler.schedule_running_jobs()

执行步骤

在这里插入图片描述

  1. dag_scheduler.py:rundo轮询,发现处于running状态的job(这一部分见fate flow server 启动部分)
    对应fate_flow/fate_flow_schedule.log依次为
    1
    2
    [INFO] [2021-07-26 08:20:32,117] [1:140259369826048] - dag_scheduler.py[line:148]: start schedule running jobs
    [INFO] [2021-07-26 08:20:32,127] [1:140259369826048] - dag_scheduler.py[line:150]: have 1 running jobs

对应查询running状态的fate_flow/peewee.log为

1
[DEBUG] [2021-07-26 08:20:34,354] [1:140259369826048] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_user_id`, `t1`.`f_job_id`, `t1`.`f_name`, `t1`.`f_description`, `t1`.`f_tag`, `t1`.`f_dsl`, `t1`.`f_runtime_conf`, `t1`.`f_runtime_conf_on_party`, `t1`.`f_train_runtime_conf`, `t1`.`f_roles`, `t1`.`f_work_mode`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_is_initiator`, `t1`.`f_progress`, `t1`.`f_ready_signal`, `t1`.`f_ready_time`, `t1`.`f_cancel_signal`, `t1`.`f_cancel_time`, `t1`.`f_rerun_signal`, `t1`.`f_end_scheduling_updates`, `t1`.`f_engine_name`, `t1`.`f_engine_type`, `t1`.`f_cores`, `t1`.`f_memory`, `t1`.`f_remaining_cores`, `t1`.`f_remaining_memory`, `t1`.`f_resource_in_use`, `t1`.`f_apply_resource_time`, `t1`.`f_return_resource_time`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_job` AS `t1` WHERE ((`t1`.`f_is_initiator` = %s) AND (`t1`.`f_status` = %s)) ORDER BY `t1`.`f_create_time` ASC', [True, 'running'])

  1. dag_scheduler.py:执行schedule_running_jobs 调度处于running 状态的job。
    先在${job_log_dir}/fate_flow_schedule.log 输出
    1
    [INFO] [2021-07-26 08:20:32,128] [1:140259369826048] - dag_scheduler.py[line:152]: schedule running job 202107260820309976351
  2. dag_scheduler.py:调用schedule_utils.get_job_dsl_parser 解析参数
  3. schedule_utils.py:返回解析完毕的 dsl_parser
  4. dag_scheduler.py:调用TaskScheduler.schedule(job=job, dsl_parser=dsl_parser, canceled=job.f_cancel_signal)调度task,注意这里的f_cancel_signal,这个是在detector 轮询时,更新的状态。
  5. task_scheduler.py:执行task调度。
    先在${job_log_dir}/fate_flow_schedule.log 输出
    1
    [INFO] [2021-07-26 08:20:32,130] [1:140259369826048] - task_scheduler.py[line:28]: scheduling job 202107260820309976351 tasks
    调用JobSaver.get_tasks_asc 获取相关信息。
  6. job_saver.py: 执行如下源码,获取该job对应的task
    1
    2
    tasks = Task.select().where(Task.f_job_id == job_id, Task.f_role == role, Task.f_party_id == party_id).order_by(Task.f_create_time.asc())
    tasks_group = cls.get_latest_tasks(tasks=tasks)
    按照create_time,升序获取当前job中涉及的task。从前文可知,create job 的时候,会根据job涉及的component, 依次create 不同的task ,所以在这里按照create_time顺序获取,也就是task的执行顺序(?如果db出问题,create_time 乱了,咋玩?)
    操作数据库的日志在fate_flow/peewee.log 中
1
[DEBUG] [2021-07-26 08:20:32,133] [1:140259369826048] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s)) ORDER BY `t1`.`f_create_time` ASC', ['202107260820309976351', 'local', '0'])

get_latest_tasks 的作用是,获取new version 的task。
最后,将需要执行的tasks 返回给task_scheduler

  1. task_scheduler.py:遍历待执行的task,针对每个task执行如下操作:
    判断federated_status_collect_type,如是PULL 需要执行collect_task_of_all_party()通过调用JobSaver.query_task()获取各个party 上task的信息。(步骤9 -10)
    调用federated_task_status 并计算当前task状态(11-13)
    更新task状态 14
    加入waiting task 队列 15
    注:PULL需要主动拉取多方计算下,其余party上,对应task的信息
  2. job_saver.py:执行query_task,返回结果,对应的fate_flow/peewee.log日志如下
    1
    [DEBUG] [2021-07-26 08:20:32,139] [1:140259369826048] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s))', ['202107260820309976351_upload_0', 0])
  3. task_scheduler.py:紧接着8,对9的查询结果进行判断。

    1
    2
    if not len(tasks_status_on_all) > 1 and not TaskStatus.RUNNING in tasks_status_on_all:
    return

    当前是upload任务,不涉及多方计算,故len(tasks_status_on_all) = 1,且此时task处于waiting,故直接返回,后面task 变为runing,便会执行下文的collect。

  4. task_scheduler.py:执行federated_task_status 获取当前task状态。先调用JobSaver.query_job()进行查询

  5. job_saver.py:执行query_task,返回结果,对应的fate_flow/peewee.log日志如下
    1
    [DEBUG] [2021-07-26 08:20:32,144] [1:140259369826048] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s))', ['202107260820309976351_upload_0', 0])
  6. task_scheduler.py:判断除non-idmapping role之外,所有的party上,该task的状态(?non-idmapping 这段逻辑没看懂),如存在非SUCCESS状态的,则收集所有状态,调用calculate_multi_party_task_status() 计算当前状态。并输出日志到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,148] [1:140259369826048] - task_scheduler.py[line:143]: job 202107260820309976351 task 202107260820309976351_upload_0 0 status is waiting, calculate by task party status list: ['waiting']
  7. task_scheduler.py:判断13计算得到的状态和原状态是否一致,如不一致,则更新原状态,并调用FederatedScheduler.sync_task_status 更新各个party该task的状态。(这里都为waiting,状态一致无需更新)

  8. task_scheduler.py:判断状态,如为waiting则加入waiting_tasks队列,如为EndStatus,则调用FederatedScheduler.stop_task

  9. task_scheduler.py:判断是否canceled,如未cancel 则遍历waiting_tasks执行17-48。(疑问:为啥不是先判断?)

  10. task_scheduler.py:针对每个task,调用dsl_parser.get_upstream_dependent_components() 获取其依赖的components,如果前置components中存在未success的,break跳出遍历。
    (?? 存疑 这里的else缩进)

  11. task_scheduler.py:调用start_task,启动task。
    执行第一步,先输出日志到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,148] [1:140259369826048] - task_scheduler.py[line:80]: try to start job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0
  12. task_scheduler.py:调用ResourceManager.apply_for_task_resource 申请资源

  13. resource_manager.py:执行resource_for_task,先调用calculate_task_resource计算该task所需的资源。
    这里需要注意的是,调用的代码是

    1
    cores_per_task, memory_per_task = cls.calculate_task_resource(task_info=task_info)

    而calculate_task_resource的定义是

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    def calculate_task_resource(cls, task_parameters: RunParameters = None, task_info: dict = None):
    if not task_parameters:
    job_parameters = job_utils.get_job_parameters(job_id=task_info["job_id"],
    role=task_info["role"],
    party_id=task_info["party_id"])
    task_parameters = RunParameters(**job_parameters)
    if task_info["role"] in IGNORE_RESOURCE_ROLES and task_parameters.computing_engine in SUPPORT_IGNORE_RESOURCE_ENGINES:
    cores_per_task = 0
    memory_per_task = 0
    else:
    cores_per_task = task_parameters.adaptation_parameters["task_cores_per_node"] * \
    task_parameters.adaptation_parameters["task_nodes"]
    memory_per_task = task_parameters.adaptation_parameters["task_memory_per_node"] * \
    task_parameters.adaptation_parameters["task_nodes"]
    return cores_per_task, memory_per_task

    task_parameters 为空,故而会调用job_utils.get_job_parameters 对应的fate_flow/peewee.log日志为

    1
    [DEBUG] [2021-07-26 08:20:32,149] [1:140259369826048] - peewee.py[line:2863]: ('SELECT `t1`.`f_runtime_conf_on_party` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])

如申请到的cores_per_task 和 memory_per_task 有非0值,则执行update_resource_sql 生成filters 和updates 操作,再更新resource表。
执行sql对应的fate_flow/peewee.log日志为

1
[DEBUG] [2021-07-26 08:20:32,152] [1:140259369826048] - peewee.py[line:2863]: ('UPDATE `t_job` SET `f_remaining_cores` = (`t_job`.`f_remaining_cores` - %s), `f_remaining_memory` = (`t_job`.`f_remaining_memory` - %s) WHERE ((((((`t_job`.`f_remaining_cores` >= %s) AND (`t_job`.`f_remaining_memory` >= %s)) AND (`t_job`.`f_job_id` = %s)) AND (`t_job`.`f_role` = %s)) AND (`t_job`.`f_party_id` = %s)) AND (`t_job`.`f_resource_in_use` = %s))', [4, 0, 4, 0, '202107260820309976351', 'local', '0', True])

执行成功,会在${job_log_dir}/fate_flow_schedule.log 输出日志,并将成功状态返回
1
[INFO] [2021-07-26 08:20:32,158] [1:140259369826048] - resource_manager.py[line:285]: task 202107260820309976351_upload_0 0 apply resource successfully

  1. task_scheduler.py:若未申请资源成功,返回无资源。否则将task 状态置为running,并调用jobsaver.update_task_status 更新状态。

  2. job_saver.py:执行update_task_status()
    先在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    [INFO] [2021-07-26 08:20:32,159] [1:140259369826048] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status

    执行update_status,先通过select,获取task基本信息,fate_flow/peewee.log日志为

1
[DEBUG] [2021-07-26 08:20:32,160] [1:140259369826048] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((((`t1`.`f_job_id` = %s) AND (`t1`.`f_task_id` = %s)) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])

修改之后再update,fate_flow/peewee.log日志
注:这里update 会是调用update_status() 方法,会先判断能否从oldStatus 转换成newStatus,如不能,不会执行update语句,直接返回false

1
[DEBUG] [2021-07-26 08:20:32,165] [1:140259369826048] - peewee.py[line:2863]: ('UPDATE `t_task` SET `f_status` = %s WHERE ((((((`t_task`.`f_job_id` = %s) AND (`t_task`.`f_task_id` = %s)) AND (`t_task`.`f_task_version` = %s)) AND (`t_task`.`f_role` = %s)) AND (`t_task`.`f_party_id` = %s)) AND (`t_task`.`f_status` = %s))', ['running', '202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0', 'waiting'])

如执行成功,则在${job_log_dir}/fate_flow_schedule.log 输出日志,并返回执行状态(如失败,打出失败日志update does not take effect)
1
[INFO] [2021-07-26 08:20:32,172] [1:140259369826048] - job_saver.py[line:74]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status successfully: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'role': 'local', 'party_id': '0', 'status': 'running'}

  1. task_scheduler.py:如状态未更新成功,打出失败日志${job_log_dir}/fate_flow_schedule.log ,并回收资源。否则输出成功日志,对应18
    1
    [INFO] [2021-07-26 08:20:32,173] [1:140259369826048] - task_scheduler.py[line:93]: start job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0
  2. task_scheduler.py:调用FederatedScheduler.sync_task_status() 同步状态至所有party
  3. federated_scheduler.py: 执行sync_task_status()
    先打印日志到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,173] [1:140259369826048] - federated_scheduler.py[line:192]: job 202107260820309976351 task 202107260820309976351_upload_0 0 is running, sync to all party

    再调用 task_command() 通过 federated_coordination_on_http 发起http 请求

  4. api_utils.py:发起请求,日志见 ${job_log_dir}/fate_flow_audit.log

    1
    [INFO] [2021-07-26 08:20:32,175] [1:140259369826048] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/running
  5. fate_flow_server.py:接受请求,通过flask,转移到party_app

  6. party_app.py: 执行task_status,调用TaskController.update_task_status(task_info=task_info),更新task信息。实际操作是调用JobSaver.update_task_status

  7. job_saver.py:执行update_task_status()
    先输出日志到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,183] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status

    再执行update_status,先通过select,获取task基本信息,fate_flow/peewee.log日志为

    1
    [DEBUG] [2021-07-26 08:20:32,187] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((((`t1`.`f_job_id` = %s) AND (`t1`.`f_task_id` = %s)) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])

    这里和22 的操作一样,但是时间戳不同。接下去update部分,则因为running状态不能转变为running 状态,不会执行update,所以就返回了false。
    输出失败的日志到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,192] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'status': 'running'}

    返回false 信息。
    执行report_task_to_initiator(), 查询task信息,如task.f_federated_status_collect_typePUSH 架构下,调用FederatedScheduler.report_task_to_initiator() 主动推送信息。
    对应的fate_flow/peewee.log日志为

    1
    [DEBUG] [2021-07-26 08:20:32,196] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351_upload_0', 0, 'local', '0'])

    前文所述,这里是PULL,故不会执行。直接返回状态给app_party.py 对应28

  8. party_app.py:返回执行失败信息,对应27

  9. fate_flow_server.py:返回失败信息给api_utils.py 对应26
  10. api_utils.py:接受失败信息,在${job_log_dir}/fate_flow_audit.log 输出如下日志
    1
    2
    3
    [INFO] [2021-07-26 08:20:32,203] [1:140259369826048] - api_utils.py[line:129]: {"retcode":103,"retmsg":"update task status failed"}

    [INFO] [2021-07-26 08:20:32,203] [1:140259369826048] - api_utils.py[line:131]: remote http api response: /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/running {'retcode': 103, 'retmsg': 'update task status failed'}
    对应的容器日志为
    1
    10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/running HTTP/1.1" 200 -
    并返回结果给federated_scheduler.py 对应25
  11. federated_scheduler.py:在${job_log_dir}/fate_flow_schedule.log 输出日志,并返回信息

    1
    2
    3
    4
    [WARNING] [2021-07-26 08:20:32,203] [1:140259369826048] - federated_scheduler.py[line:260]: an error occurred while status/running the task to role local party 0: 
    update task status failed
    [INFO] [2021-07-26 08:20:32,203] [1:140259369826048] - federated_scheduler.py[line:197]: sync job 202107260820309976351 task 202107260820309976351_upload_0 0 status running to all party failed:
    {'local': {0: {'retcode': 103, 'retmsg': 'update task status failed'}}}
  12. task_scheduler.py:调用FederatedScheduler.start_task() 启动多方任务

  13. federated_scheduler.py:执行start_task()
    发起http请求 ${job_log_dir}/fate_flow_audit.log

    1
    [INFO] [2021-07-26 08:20:32,205] [1:140259369826048] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/start
  14. fate_flow_server.py:接受请求,通过flask,转移到party_app

  15. party_app.py: 执行task_status,调用TaskController.start_task
    这里不论调用结果,返回的都是success。

  16. task_controller.py:执行start_task。
    先调用job_utils.get_job_dsl() 获取job的dsl。这里需要查询DB,产生fate_flow/peewee.log日志

    1
    [DEBUG] [2021-07-26 08:20:32,212] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_dsl` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])

    鉴权,(接上文,本job无鉴权,故不会执行)。
    输出日志到${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,214] [1:140259119585024] - task_controller.py[line:71]: try to start job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 executor subprocess

    生成task等目录,并创建。将task_parameters_path 写入对应文件。

    1
    2
    3
    4
    5
    task_dir = os.path.join(job_utils.get_job_directory(job_id=job_id), role, party_id, component_name, task_id, task_version)
    /data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/

    task_parameters_path = os.path.join(task_dir, 'task_parameters.json')
    run_parameters_dict = job_utils.get_job_parameters(job_id, role, party_id)

获取run_parameters,对应fate_flow/peewee.log日志

1
[DEBUG] [2021-07-26 08:20:32,216] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_runtime_conf_on_party` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])

输出日志:${job_log_dir}/fate_flow_schedule.log

1
[INFO] [2021-07-26 08:20:32,218] [1:140259119585024] - task_controller.py[line:90]: use computing engine EGGROLL

根据computing engine,生成process_cmd。这里可以看下源码,比较有意思

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
if run_parameters.computing_engine in {ComputingEngine.EGGROLL, ComputingEngine.STANDALONE}:
process_cmd = [
sys.executable,
sys.modules[TaskExecutor.__module__].__file__,
'-j', job_id,
'-n', component_name,
'-t', task_id,
'-v', task_version,
'-r', role,
'-p', party_id,
'-c', task_parameters_path,
'--run_ip', RuntimeConfig.JOB_SERVER_HOST,
'--job_server', '{}:{}'.format(RuntimeConfig.JOB_SERVER_HOST, RuntimeConfig.HTTP_PORT),
]
elif run_parameters.computing_engine == ComputingEngine.SPARK:
if "SPARK_HOME" not in os.environ:
raise EnvironmentError("SPARK_HOME not found")
spark_home = os.environ["SPARK_HOME"]

# additional configs
spark_submit_config = run_parameters.spark_run

deploy_mode = spark_submit_config.get("deploy-mode", "client")
if deploy_mode not in ["client"]:
raise ValueError(f"deploy mode {deploy_mode} not supported")

spark_submit_cmd = os.path.join(spark_home, "bin/spark-submit")
process_cmd = [spark_submit_cmd, f'--name={task_id}#{role}']
for k, v in spark_submit_config.items():
if k != "conf":
process_cmd.append(f'--{k}={v}')
if "conf" in spark_submit_config:
for ck, cv in spark_submit_config["conf"].items():
process_cmd.append(f'--conf')
process_cmd.append(f'{ck}={cv}')
process_cmd.extend([
sys.modules[TaskExecutor.__module__].__file__,
'-j', job_id,
'-n', component_name,
'-t', task_id,
'-v', task_version,
'-r', role,
'-p', party_id,
'-c', task_parameters_path,
'--run_ip', RuntimeConfig.JOB_SERVER_HOST,
'--job_server', '{}:{}'.format(RuntimeConfig.JOB_SERVER_HOST, RuntimeConfig.HTTP_PORT),
])
else:
raise ValueError(f"${run_parameters.computing_engine} is not supported")

注:spark 只支持client模式
注:spark 的submint 是 $sparkhome/bin/spark-submit
建立task的日志目录

1
2
3
task_log_dir = os.path.join(job_utils.get_job_log_directory(job_id=job_id), role, party_id, component_name)
对应
local/0/upload_0/

输出日志:${job_log_dir}/fate_flow_schedule.log
1
[INFO] [2021-07-26 08:20:32,218] [1:140259119585024] - task_controller.py[line:144]: job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 executor subprocess is ready

调用job_utils.run_subprocess() 真正执行的部分来了

  1. job_utils.py:执行run_subprocess
    先输出日志:${job_log_dir}/fate_flow_schedule.log
    1
    [INFO] [2021-07-26 08:20:32,218] [1:140259119585024] - job_utils.py[line:310]: start process command: /opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202107260820309976351 -n upload_0 -t 202107260820309976351_upload_0 -v 0 -r local -p 0 -c /data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/task_parameters.json --run_ip 10.200.96.235 --job_server 10.200.96.235:9380
    这里会把真实执行的命令打出。
    注:小技巧,本地debug 可以直接执行这条命令,这也是真正的入口,可以参考文档Mac下使用Pycharm

获取相关目录

1
2
3
4
5
os.makedirs(config_dir, exist_ok=True)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
std_log = open(os.path.join(log_dir if log_dir else config_dir, 'std.log'), 'w')
pid_path = os.path.join(config_dir, 'pid')

判断操作系统。
执行cmd
1
2
3
4
5
p = subprocess.Popen(process_cmd,
stdout=std_log,
stderr=std_log,
startupinfo=startupinfo
)

并将pid 写入对应目录。
打出日志${job_log_dir}/fate_flow_schedule.log
1
[INFO] [2021-07-26 08:20:32,258] [1:140259119585024] - job_utils.py[line:333]: start process command: /opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202107260820309976351 -n upload_0 -t 202107260820309976351_upload_0 -v 0 -r local -p 0 -c /data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/task_parameters.json --run_ip 10.200.96.235 --job_server 10.200.96.235:9380 successfully, pid is 90

至此 schedule 结束,并返回进程给task_controller.py 对应38

  1. task_controller.py:接收38返回值,更新task变量状态。在finally部分,调用update_task,update_task_status更新DB中task状态。两个方法均会调用job_saver.py 的对应方法。

  2. task_controller.py:update_task调用执行job_saver.update_task()
    先打印日志${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,259] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0

    注:这里和update_task_status 相比,少了一个status(参考步骤20)
    执行update_entity_table(),先查询,再执行update,对应fate_flow/peewee.log日志为

    1
    2
    [DEBUG] [2021-07-26 08:20:32,263] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((((`t1`.`f_job_id` = %s) AND (`t1`.`f_task_id` = %s)) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])
    [DEBUG] [2021-07-26 08:20:32,270] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_task` SET `f_start_time` = %s, `f_start_date` = %s WHERE (((((`t_task`.`f_job_id` = %s) AND (`t_task`.`f_task_id` = %s)) AND (`t_task`.`f_task_version` = %s)) AND (`t_task`.`f_role` = %s)) AND (`t_task`.`f_party_id` = %s))', [1627287632259, datetime.datetime(2021, 7, 26, 8, 20, 32), '202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])

    update成功,输出日志${job_log_dir}/fate_flow_schedule.log

    1
    [INFO] [2021-07-26 08:20:32,275] [1:140259119585024] - job_saver.py[line:84]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update successfully

    执行 report_task_to_initiator,同29中,只执行了query,不发起请求。

    1
    [DEBUG] [2021-07-26 08:20:32,277] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351_upload_0', 0, 'local', '0'])

    返回update_status

  3. task_controller.py:update_task_status调用job_saver.update_task_status()
    过程参照22,对应fate_flow/peewee.log日志

    1
    2
    [DEBUG] [2021-07-26 08:20:32,283] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((((`t1`.`f_job_id` = %s) AND (`t1`.`f_task_id` = %s)) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])
    [DEBUG] [2021-07-26 08:20:32,289] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_task` SET `f_party_status` = %s WHERE ((((((`t_task`.`f_job_id` = %s) AND (`t_task`.`f_task_id` = %s)) AND (`t_task`.`f_task_version` = %s)) AND (`t_task`.`f_role` = %s)) AND (`t_task`.`f_party_id` = %s)) AND (`t_task`.`f_party_status` = %s))', ['running', '202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0', 'waiting'])

    对应${job_log_dir}/fate_flow_schedule.log

    1
    2
    [INFO] [2021-07-26 08:20:32,281] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
    [INFO] [2021-07-26 08:20:32,295] [1:140259119585024] - job_saver.py[line:74]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status successfully: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'party_status': 'running', 'start_time': 1627287632259}

    注:29失败,这里成功,因为字段不同,29是f_status,这里是f_party_status
    执行 report_task_to_initiator,同29中,只执行了query,不发起请求。

    1
    [DEBUG] [2021-07-26 08:20:32,300] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351_upload_0', 0, 'local', '0'])

注:对比22 和 29 可以发现,job_saver 不发起report ,task_control 中的方法有report

  1. task_controller.py:执行完毕,无异常,在${job_log_dir}/fate_flow_schedule.log输出日志

    1
    [INFO] [2021-07-26 08:20:32,304] [1:140259119585024] - task_controller.py[line:163]: job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 executor subprocess start success
  2. party_app.py: 对应37,返回结果。这里不论上文的执行情况,返回的都是success。

    1
    2
    TaskController.create_task(role, party_id, True, request.json)
    return get_json_result(retcode=0, retmsg='success')
  3. fate_flow_server.py:返回结果给api_utils.py 对应36

  4. api_utils.py:接受信息,在${job_log_dir}/fate_flow_audit.log 输出如下日志

    1
    2
    3
    [INFO] [2021-07-26 08:20:32,310] [1:140259369826048] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}

    [INFO] [2021-07-26 08:20:32,311] [1:140259369826048] - api_utils.py[line:131]: remote http api response: /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/start {'retcode': 0, 'retmsg': 'success'}

    对应的容器日志为

    1
    10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/start HTTP/1.1" 200 -

    并返回结果给federated_scheduler.py 对应35

  5. federated_scheduler.py:返回结果给task_scheduler.py,对应34

  6. task_scheduler.py:status_code 为SUCCESS,返回SchedulingStatusCode.SUCCESS,对应16。
  7. task_scheduler.py:对应16,依次遍历,完成本轮该job下task的调度,并在${job_log_dir}/fate_flow_schedule.log输出日志
    1
    [INFO] [2021-07-26 08:20:32,311] [1:140259369826048] - task_scheduler.py[line:75]: finish scheduling job 202107260820309976351 tasks
    注:这里不是一次性调度该job下的所有task,当task的前置依赖task不满足时,会跳出循环,等下一次轮询
  8. task_scheduler.py:返回各task的当前状态给dag_scheduler.py 对应5
  9. dag_scheduler.py:调用calculate_job_status() 计算当前job状态,如果收到cancel信号,且job处于waiting,将状态置为canceled。
    计算当前的进度。完成的task/总task(吐槽,无视了各个task的耗时)
    ${job_log_dir}/fate_flow_schedule.log输出日志
    1
    [INFO] [2021-07-26 08:20:32,312] [1:140259369826048] - dag_scheduler.py[line:310]: Job 202107260820309976351 status is running, calculate by task status list: ['running']
  10. dag_scheduler.py:根据job状态和进度变化情况,向多方同步相关信息。
  11. dag_scheduler.py:整个schedule_running_job结束。${job_log_dir}/fate_flow_schedule.log输出日志
    1
    [INFO] [2021-07-26 08:20:32,312] [1:140259369826048] - dag_scheduler.py[line:325]: finish scheduling job 202107260820309976351