0%
4.5k 字 26 分钟

FATE学习:跟着日志读源码(七)upload任务task excute阶段

综述

前文对upload task 进行schedule 之后,最终调用 task_executor 进行执行,进入具体的执行部分。
每个task的具体日志会打在${job_log_dir}/ $ {role}/ ${party} 中(为便于记录,这里简记为 ${task_log_dir}
这里就按照 ${task_log_dir}/DEBUG.log看 会比较清晰一点

执行细节

在这里插入图片描述

  1. 执行命令
    1
    /opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202107260820309976351 -n upload_0 -t 202107260820309976351_upload_0 -v 0 -r local -p 0 -c /data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/task_parameters.json --run_ip 10.200.96.235 --job_server 10.200.96.235:9380
  2. task_executor.py:执行run_task
    先解析各参数。
    在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    2
    [INFO] [2021-07-26 08:20:33,391] [90:139687957583680] - task_executor.py[line:56]: enter task process
    [INFO] [2021-07-26 08:20:33,391] [90:139687957583680] - task_executor.py[line:57]: Namespace(component_name='upload_0', config='/data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/task_parameters.json', job_id='202107260820309976351', job_server='10.200.96.235:9380', party_id=0, role='local', run_ip='10.200.96.235', task_id='202107260820309976351_upload_0', task_version=0)

    根据参数解析的结果,调用 schedule_utils.get_job_dsl_parser() 生成dsl_parser,并配置各项参数。
    设置job_log_dir 和 task_log_dir。
    注:设置完目录后,task产生的所有日志,都是输出到task_log_dir 下了,和外层的fate_flow_schedule.log 分离了

  3. task_executor.py:初始化Tracker 和 TrackerClient,获取run_class_paths、run_class_package、run_class_name,调用 report_task_update_to_driver(task_info=task_info)

  4. task_executor.py:执行report_task_update_to_driver()
    先在 ${task_log_dir}/fate_flow_schedule.log 输出日志

    1
    [INFO] [2021-07-26 08:20:33,395] [90:139687957583680] - task_executor.py[line:318]: report task 202107260820309976351_upload_0 0 local 0 to driver

    然后调用ControllerClient.report_task

  5. control_client.py:执行report_task
    先在${job_log_dir}/${role}/${party}/DEBUG.log 中打日志
    1
    [INFO] [2021-07-26 08:20:33,396] [90:139687957583680] - control_client.py[line:42]: request update job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0
    然后发起http请求,endpoint是report。
    流转流程同前文,调用链为 ControllerClient.report_task -> fate_flow_server 通过flask -> party_app.report_task() -> TaskController.update_task(task_info=task_info) -> TaskController.update_task_status(task_info=task_info)

在${job_log_dir}/fate_flow_audit.log 中的日志为

1
2
3
4
[INFO] [2021-07-26 08:20:33,396] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report
[INFO] [2021-07-26 08:20:33,452] [90:139687957583680] - api_utils.py[line:129]: {"retcode":103,"retmsg":"update task status failed"}

[INFO] [2021-07-26 08:20:33,452] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report {'retcode': 103, 'retmsg': 'update task status failed'}

因为status一致,无法update,故failed.(同五中所述)

在${job_log_dir}/fate_flow_schedule.log 中输出日志

1
2
3
4
[INFO] [2021-07-26 08:20:33,404] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[INFO] [2021-07-26 08:20:33,422] [1:140259119585024] - job_saver.py[line:84]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update successfully
[INFO] [2021-07-26 08:20:33,429] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:33,438] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'component_name': 'upload_0', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'run_ip': '10.200.96.235', 'run_pid': 90, 'party_status': 'running'}

在${job_log_dir}/peewee.log中输出日志为

1
2
3
4
5
6
7
8
[DEBUG] [2021-07-26 08:20:33,409] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((((`t1`.`f_job_id` = %s) AND (`t1`.`f_task_id` = %s)) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])
[DEBUG] [2021-07-26 08:20:33,416] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_task` SET `f_component_name` = %s, `f_run_ip` = %s, `f_run_pid` = %s WHERE (((((`t_task`.`f_job_id` = %s) AND (`t_task`.`f_task_id` = %s)) AND (`t_task`.`f_task_version` = %s)) AND (`t_task`.`f_role` = %s)) AND (`t_task`.`f_party_id` = %s))', ['upload_0', '10.200.96.235', 90, '202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])

[DEBUG] [2021-07-26 08:20:33,425] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351_upload_0', 0, 'local', '0'])

[DEBUG] [2021-07-26 08:20:33,433] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((((`t1`.`f_job_id` = %s) AND (`t1`.`f_task_id` = %s)) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])

[DEBUG] [2021-07-26 08:20:33,441] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351_upload_0', 0, 'local', '0'])

  1. taskexecutor.py:初始化环境变量,设置session
    sess.initfederation 会调用federation/eggroll/_federation.py 的 __init
    ,调用如下

    1
    2
    [DEBUG] [2021-07-26 08:20:39,837] [90:139687957583680] - _federation.py[line:35]: [federation.eggroll]init federation: rp_session_id=202107260820309976351_upload_0_0_local_0, rs_session_id=202107260820309976351_upload_0_0, party=Party(role=local, party_id=0), proxy_endpoint=rollsite:9370
    [DEBUG] [2021-07-26 08:20:39,837] [90:139687957583680] - _federation.py[line:45]: [federation.eggroll]init federation context done
  2. task_executor.py:开始run task
    先在${job_log_dir}/${role}/${party}/DEBUG.log 打出日志

    1
    2
    3
    [INFO] [2021-07-26 08:20:39,837] [90:139687957583680] - task_executor.py[line:156]: Run 202107260820309976351 upload_0 202107260820309976351_upload_0 local 0 task
    [INFO] [2021-07-26 08:20:39,838] [90:139687957583680] - task_executor.py[line:157]: Component parameters on party {'UploadParam': {'file': '/data/projects/fate/jobs/202107260820309976351/fate_upload_tmp/breast_hetero_guest.csv', 'head': 1, 'id_delimiter': ',', 'partition': 1, 'namespace': 'cl', 'name': 'hetero_guest', 'storage_engine': '', 'storage_address': None, 'destroy': False}, 'initiator': {'role': 'local', 'party_id': 0}, 'job_parameters': {'job_type': 'train', 'work_mode': 1, 'backend': 0, 'computing_engine': 'EGGROLL', 'federation_engine': 'EGGROLL', 'storage_engine': 'EGGROLL', 'engines_address': {'computing': {'cores_per_node': 20, 'nodes': 1}, 'federation': {'host': 'rollsite', 'port': 9370}, 'storage': {'cores_per_node': 20, 'nodes': 1}}, 'federated_mode': 'MULTIPLE', 'task_parallelism': 1, 'computing_partitions': 4, 'federated_status_collect_type': 'PULL', 'model_id': 'local-0#model', 'model_version': '202107260820309976351', 'eggroll_run': {'eggroll.session.processors.per.node': 4}, 'spark_run': {}, 'rabbitmq_run': {}, 'adaptation_parameters': {'task_nodes': 1, 'task_cores_per_node': 4, 'task_memory_per_node': 0, 'request_task_cores': 4, 'if_initiator_baseline': False}}, 'role': {'local': [0]}, 'component_parameters': {'role': {'local': {'0': {'upload_0': {'name': 'hetero_guest', 'head': 1, 'file': '/data/projects/fate/jobs/202107260820309976351/fate_upload_tmp/breast_hetero_guest.csv', 'partition': 1, 'namespace': 'cl', 'destroy': False}}}}}, 'dsl_version': 2, 'local': {'role': 'local', 'party_id': 0}, 'CodePath': 'fate_flow/components/upload.py/Upload', 'module': 'Upload', 'output_data_name': None}
    [INFO] [2021-07-26 08:20:39,838] [90:139687957583680] - task_executor.py[line:158]: Task input dsl {}

    然后获取 task_run_args,配置run_object,后执行

    1
    2
    profile.profile_start()
    run_object.run(component_parameters_on_party, task_run_args)

这里的run_object 是 Upload,run_object.run 即调用Upload.run()

  1. upload.py:执行run
    根据 component_parameters 获取参数。
    在${job_log_dir}/${role}/${party}/DEBUG.log 打出日志
    1
    2
    [INFO] [2021-07-26 08:20:39,883] [90:139687957583680] - upload.py[line:41]: {'file': '/data/projects/fate/jobs/202107260820309976351/fate_upload_tmp/breast_hetero_guest.csv', 'head': 1, 'id_delimiter': ',', 'partition': 1, 'namespace': 'cl', 'name': 'hetero_guest', 'storage_engine': '', 'storage_address': None, 'destroy': False}
    [INFO] [2021-07-26 08:20:39,883] [90:139687957583680] - upload.py[line:42]: {'job_parameters': <fate_flow.entity.types.RunParameters object at 0x7f0b81a98b38>}
    根据参数,设置各变量。

build session

1
2
3
4
5
6
7
[DEBUG] [2021-07-26 08:20:39,883] [90:139687957583680] - pool.py[line:129]: No connection available in pool.
[DEBUG] [2021-07-26 08:20:39,889] [90:139687957583680] - pool.py[line:158]: Created new connection 139687331472832.
[DEBUG] [2021-07-26 08:20:39,891] [90:139687957583680] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_date`, `t1`.`f_update_date`, `t1`.`f_name`, `t1`.`f_namespace`, `t1`.`f_address`, `t1`.`f_engine`, `t1`.`f_type`, `t1`.`f_options`, `t1`.`f_partitions`, `t1`.`f_id_delimiter`, `t1`.`f_in_serialized`, `t1`.`f_have_head`, `t1`.`f_schema`, `t1`.`f_count`, `t1`.`f_part_of_data`, `t1`.`f_description`, `t1`.`f_create_time`, `t1`.`f_update_time` FROM `t_storage_table_meta` AS `t1` WHERE ((`t1`.`f_name` = %s) AND (`t1`.`f_namespace` = %s))', ['hetero_guest', 'cl'])
[DEBUG] [2021-07-26 08:20:39,894] [90:139687957583680] - pool.py[line:185]: Returning 139687331472832 to pool.
[DEBUG] [2021-07-26 08:20:39,966] [90:139687957583680] - peewee.py[line:2863]: ('INSERT INTO `t_session_record` (`f_session_id`, `f_create_date`, `f_update_time`, `f_update_date`, `f_engine_name`, `f_engine_type`, `f_engine_address`, `f_create_time`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)', ['202107260820309976351_upload_0_0_local_0_storage_5cbb3a88edea11ebbccccaf5cc2d708a', datetime.datetime(2021, 7, 26, 8, 20, 39), 1627287639962, datetime.datetime(2021, 7, 26, 8, 20, 39), 'STANDALONE', 'storage', '{}', 1627287639962])
[DEBUG] [2021-07-26 08:20:39,973] [90:139687957583680] - _session.py[line:144]: save session 202107260820309976351_upload_0_0_local_0_storage_5cbb3a88edea11ebbccccaf5cc2d708a record
[DEBUG] [2021-07-26 08:20:39,974] [90:139687957583680] - pool.py[line:185]: Returning 139687331472832 to pool.

1
2
3
4
5
6
[DEBUG] [2021-07-26 08:20:39,975] [90:139687957583680] - peewee.py[line:2863]: ('DELETE FROM `t_session_record` WHERE (`t_session_record`.`f_session_id` = %s)', ['202107260820309976351_upload_0_0_local_0_storage_5cbb3a88edea11ebbccccaf5cc2d708a'])
[DEBUG] [2021-07-26 08:20:39,981] [90:139687957583680] - _session.py[line:153]: delete session 202107260820309976351_upload_0_0_local_0_storage_5cbb3a88edea11ebbccccaf5cc2d708a record
[DEBUG] [2021-07-26 08:20:39,981] [90:139687957583680] - pool.py[line:185]: Returning 139687331472832 to pool.
[DEBUG] [2021-07-26 08:20:39,985] [90:139687957583680] - peewee.py[line:2863]: ('INSERT INTO `t_session_record` (`f_session_id`, `f_create_date`, `f_update_time`, `f_update_date`, `f_engine_name`, `f_engine_type`, `f_engine_address`, `f_create_time`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)', ['202107260820309976351_upload_0_0_local_0_storage_5cca2ef8edea11ebbccccaf5cc2d708a', datetime.datetime(2021, 7, 26, 8, 20, 39), 1627287639985, datetime.datetime(2021, 7, 26, 8, 20, 39), 'EGGROLL', 'storage', '{}', 1627287639985])
[DEBUG] [2021-07-26 08:20:39,989] [90:139687957583680] - _session.py[line:144]: save session 202107260820309976351_upload_0_0_local_0_storage_5cca2ef8edea11ebbccccaf5cc2d708a record
[DEBUG] [2021-07-26 08:20:39,989] [90:139687957583680] - pool.py[line:185]: Returning 139687331472832 to pool.
1
2
3
4
5
if table:
LOGGER.info(f"destroy table name: {name} namespace: {namespace} engine: {table.get_engine()}")
table.destroy()
else:
LOGGER.info(f"can not found table name: {name} namespace: {namespace}, pass destroy")

注:如果有destroy 参数,会调用table.destroy
更新upload的address,在${job_log_dir}/${role}/${party}/DEBUG.log 打出日志

1
[INFO] [2021-07-26 08:20:41,836] [90:139687957583680] - upload.py[line:95]: upload to EGGROLL storage, address: {'cores_per_node': 20, 'nodes': 1, 'name': 'hetero_guest', 'namespace': 'cl', 'storage_type': 'LMDB'}

这个address 就是LMDB 的存储位置
1
2
3
4
5
6
address = storage.StorageTableMeta.create_address(storage_engine=storage_engine, address_dict=address_dict)
self.parameters["partitions"] = partitions
self.parameters["name"] = name
self.table = storage_session.create_table(address=address, **self.parameters)
data_table_count = self.save_data_table(job_id, name, namespace, head)
self.table.get_meta().update_metas(in_serialized=True)

如果是使用local 模式调试,在项目目录下会生成data目录。
在这里插入图片描述

如果是kubeFATE 部署,该数据位于 nodemanager 的 /data/projects/fate/eggroll/data/LMDB 目录下
在这里插入图片描述

依次为创建元数据,建表,保存数据,更新元数据

  1. upload.py:执行save_data_table ,
    获取文件schma。
    按最大文件块读取( lines = fin.readlines(self.MAX_BYTES)),保存至LMDB
    进度计算如下:
    1
    save_progress = lines_count/input_feature_count*100//1
    故而这里的进度和MAX_BYTES 有关。
  1. upload.py: 调用ControllerClient.update_job(job_info=job_info) 更新task 状态(主要是进度),control_client.update_job
    流转流程同前文,调用链为 ControllerClient.update_job -> fate_flow_server 通过flask -> party_app.update_task() -> TaskController.update_task(task_info=task_info)

输出日志

${job_log_dir}/${role}/${party}/DEBUG.log

1
[INFO] [2021-07-26 08:20:41,942] [90:139687957583680] - control_client.py[line:26]: request update job 202107260820309976351 on local 0

在${job_log_dir}/fate_flow_audit.log 中的日志为

1
2
3
4
[INFO] [2021-07-26 08:20:41,942] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/party/202107260820309976351/local/0/update
[INFO] [2021-07-26 08:20:41,966] [90:139687957583680] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}

[INFO] [2021-07-26 08:20:41,966] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/party/202107260820309976351/local/0/update {'retcode': 0, 'retmsg': 'success'}

在${job_log_dir}/fate_flow_schedule.log 中输出日志

1
2
[INFO] [2021-07-26 08:20:41,950] [1:140259119585024] - job_saver.py[line:61]: try to update job 202107260820309976351
[INFO] [2021-07-26 08:20:41,963] [1:140259119585024] - job_saver.py[line:64]: job 202107260820309976351 update successfully: {'progress': 100.0, 'job_id': '202107260820309976351', 'role': 'local', 'party_id': '0'}

在${job_log_dir}/peewee.log中输出日志为

1
2
[DEBUG] [2021-07-26 08:20:41,954] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_user_id`, `t1`.`f_job_id`, `t1`.`f_name`, `t1`.`f_description`, `t1`.`f_tag`, `t1`.`f_dsl`, `t1`.`f_runtime_conf`, `t1`.`f_runtime_conf_on_party`, `t1`.`f_train_runtime_conf`, `t1`.`f_roles`, `t1`.`f_work_mode`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_is_initiator`, `t1`.`f_progress`, `t1`.`f_ready_signal`, `t1`.`f_ready_time`, `t1`.`f_cancel_signal`, `t1`.`f_cancel_time`, `t1`.`f_rerun_signal`, `t1`.`f_end_scheduling_updates`, `t1`.`f_engine_name`, `t1`.`f_engine_type`, `t1`.`f_cores`, `t1`.`f_memory`, `t1`.`f_remaining_cores`, `t1`.`f_remaining_memory`, `t1`.`f_resource_in_use`, `t1`.`f_apply_resource_time`, `t1`.`f_return_resource_time`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])
[DEBUG] [2021-07-26 08:20:41,960] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_job` SET `f_progress` = %s WHERE ((((`t_job`.`f_job_id` = %s) AND (`t_job`.`f_role` = %s)) AND (`t_job`.`f_party_id` = %s)) AND (`t_job`.`f_progress` <= %s))', [100, '202107260820309976351', 'local', '0', 100])

  1. upload.py: 回到9,继续保存文件,将data存入LMDB。当所有文件都存入完毕,更新metadata,更新tracker
    在task_executor.py 中是
    1
    run_object.set_tracker(tracker=tracker_client)
    故而这里self.tracker 调用的是TrackerClient
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    self.tracker.log_output_data_info(data_name='upload',
    table_namespace=dst_table_namespace,
    table_name=dst_table_name)

    self.tracker.log_metric_data(metric_namespace="upload",
    metric_name="data_access",
    metrics=[Metric("count", table_count)])
    self.tracker.set_metric_meta(metric_namespace="upload",
    metric_name="data_access",
    metric_meta=MetricMeta(name='upload', metric_type='UPLOAD'))
    对应的分别是log_output_data_DEBUG.log_metric_data,set_metric_meta三个方法
    输出日志${job_log_dir}/${role}/${party}/DEBUG.log
    1
    2
    3
    4
    [INFO] [2021-07-26 08:20:42,704] [90:139687957583680] - tracker_client.py[line:127]: Request save job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 data upload info
    [INFO] [2021-07-26 08:20:43,044] [1:140259119585024] - job_tracker.py[line:97]: save job 202107260820309976351 component upload_0 on local 0 upload data_access metric meta
    [INFO] [2021-07-26 08:20:44,838] [90:139687957583680] - job_tracker.py[line:159]: task id 202107260820309976351_upload_0 output data table is none

    发起http请求,对应的${job_log_dir}/fate_flow_audit.log 中的日志为
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    [INFO] [2021-07-26 08:20:42,704] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/output_data_info/save
    [INFO] [2021-07-26 08:20:43,009] [90:139687957583680] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}

    [INFO] [2021-07-26 08:20:43,010] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/output_data_info/save {'retcode': 0, 'retmsg': 'success'}


    [INFO] [2021-07-26 08:20:43,011] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_data/save
    [INFO] [2021-07-26 08:20:43,036] [90:139687957583680] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}

    [INFO] [2021-07-26 08:20:43,036] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_data/save {'retcode': 0, 'retmsg': 'success'}

    [INFO] [2021-07-26 08:20:43,037] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_meta/save
    [INFO] [2021-07-26 08:20:43,056] [90:139687957583680] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}

    [INFO] [2021-07-26 08:20:43,056] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_meta/save {'retcode': 0, 'retmsg': 'success'}

对应的调用链是 TrackerClient.log_output_data_info() -> fate_flow_server -> tracker_app -> job_tracker
在${job_log_dir}/fate_flow_schedule.log 中输出日志

1
2
3
4
[INFO] [2021-07-26 08:20:43,018] [1:140259119585024] - job_tracker.py[line:74]: save job 202107260820309976351 component upload_0 on local 0 upload data_access metric data
[INFO] [2021-07-26 08:20:43,044] [1:140259119585024] - job_tracker.py[line:97]: save job 202107260820309976351 component upload_0 on local 0 upload data_access metric meta
[INFO] [2021-07-26 08:20:44,838] [90:139687957583680] - job_tracker.py[line:159]: task id 202107260820309976351_upload_0 output data table is none


save完毕,返回table_count。

  1. upload.py: 打印完成日志,并清理临时文件,输出统计信息
    ${job_log_dir}/${role}/${party}/DEBUG.log
    1
    2
    3
    4
    5
    6
    [INFO] [2021-07-26 08:20:44,822] [90:139687957583680] - upload.py[line:102]: ------------load data finish!-----------------
    [INFO] [2021-07-26 08:20:44,822] [90:139687957583680] - upload.py[line:106]: remove tmp upload file
    [INFO] [2021-07-26 08:20:44,823] [90:139687957583680] - upload.py[line:107]: /data/projects/fate/jobs/202107260820309976351/fate_upload_tmp
    [INFO] [2021-07-26 08:20:44,823] [90:139687957583680] - upload.py[line:111]: file: /data/projects/fate/jobs/202107260820309976351/fate_upload_tmp/breast_hetero_guest.csv
    [INFO] [2021-07-26 08:20:44,824] [90:139687957583680] - upload.py[line:112]: total data_count: 569
    [INFO] [2021-07-26 08:20:44,824] [90:139687957583680] - upload.py[line:113]: table name: hetero_guest, table namespace: cl
  2. upload.py:回到8 ,执行profile.profile_ends()
    profile.profile_ends() 会打出${task_log_dir}/PROFILING.log中的日志(分别收集到INFO,DEBUG中)
    ${job_log_dir}/${role}/${party}/DEBUG.log

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    [INFO] [2021-07-26 08:20:44,837] [90:139687957583680] - profile.py[line:249]: 
    Computing:
    +----------+------------------------------------------+
    | function | |
    +----------+------------------------------------------+
    | total | n=0, sum=0.0000, mean=0.0000, max=0.0000 |
    +----------+------------------------------------------+

    Federation:
    +--------+------------------------------------------+
    | get | |
    +--------+------------------------------------------+
    | remote | |
    +--------+------------------------------------------+
    | total | n=0, sum=0.0000, mean=0.0000, max=0.0000 |
    +--------+------------------------------------------+

    [DEBUG] [2021-07-26 08:20:44,838] [90:139687957583680] - profile.py[line:250]:
    Detailed Computing:
    +-------+------------------------------------------+
    | stack | |
    +-------+------------------------------------------+
    | total | n=0, sum=0.0000, mean=0.0000, max=0.0000 |
    +-------+------------------------------------------+
  3. task_executor.py:执行完毕之后,再save_data ,save_out_model,然后调用report_task_update_to_driver。同4、5。再输出统计信息
    ${job_log_dir}/${role}/${party}/DEBUG.log

    1
    2
    3
    4
    5
    6
    [INFO] [2021-07-26 08:20:44,838] [90:139687957583680] - task_executor.py[line:318]: report task 202107260820309976351_upload_0 0 local 0 to driver
    [INFO] [2021-07-26 08:20:44,838] [90:139687957583680] - control_client.py[line:42]: request update job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0
    [INFO] [2021-07-26 08:20:44,938] [90:139687957583680] - task_executor.py[line:207]: task 202107260820309976351_upload_0 local 0 start time: 2021-07-26 08:20:33
    [INFO] [2021-07-26 08:20:44,939] [90:139687957583680] - task_executor.py[line:209]: task 202107260820309976351_upload_0 local 0 end time: 2021-07-26 08:20:44
    [INFO] [2021-07-26 08:20:44,939] [90:139687957583680] - task_executor.py[line:211]: task 202107260820309976351_upload_0 local 0 takes 11.447s
    [INFO] [2021-07-26 08:20:44,939] [90:139687957583680] - task_executor.py[line:214]: Finish 202107260820309976351 upload_0 202107260820309976351_upload_0 0 local 0 task success

对应${job_log_dir}/fate_flow_schedule.log 中输出日志

1
2
3
4
[INFO] [2021-07-26 08:20:44,844] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[INFO] [2021-07-26 08:20:44,864] [1:140259119585024] - job_saver.py[line:84]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update successfully
[INFO] [2021-07-26 08:20:44,873] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:44,908] [1:140259119585024] - job_saver.py[line:74]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status successfully: {'job_id': '202107260820309976351', 'component_name': 'upload_0', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'run_ip': '10.200.96.235', 'run_pid': 90, 'party_status': 'success', 'end_time': 1627287644838, 'elapsed': 11447}

  1. task_executor.py:TaskExecutor.run_task()执行完毕,执行 TaskExecutor.report_task_update_to_driver(task_info=task_info)
    ${job_log_dir}/${role}/${party}/DEBUG.log
    1
    2
    [INFO] [2021-07-26 08:20:44,940] [90:139687957583680] - task_executor.py[line:318]: report task 202107260820309976351_upload_0 0 local 0 to driver
    [INFO] [2021-07-26 08:20:44,940] [90:139687957583680] - control_client.py[line:42]: request update job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0

对应${job_log_dir}/fate_flow_schedule.log 中输出日志

1
2
3
4
[INFO] [2021-07-26 08:20:44,947] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[WARNING] [2021-07-26 08:20:44,972] [1:140259119585024] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
[INFO] [2021-07-26 08:20:44,996] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:45,015] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'component_name': 'upload_0', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'run_ip': '10.200.96.235', 'run_pid': 90, 'party_status': 'success', 'end_time': 1627287644838, 'elapsed': 11447}

注:这里的日志会和dag_scheduler 轮询的日志混在一起。区别就是 这个部分有run_pid。

  1. 返回执行结果,对应1
    至此,整个task 执行完毕。