综述
前文对upload task 进行schedule 之后,最终调用 task_executor 进行执行,进入具体的执行部分。
每个task的具体日志会打在${job_log_dir}/ $ {role}/ ${party} 中(为便于记录,这里简记为 ${task_log_dir}
这里就按照 ${task_log_dir}/DEBUG.log看 会比较清晰一点
执行细节
- 执行命令
1
/opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202107260820309976351 -n upload_0 -t 202107260820309976351_upload_0 -v 0 -r local -p 0 -c /data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/task_parameters.json --run_ip 10.200.96.235 --job_server 10.200.96.235:9380
task_executor.py:执行run_task
先解析各参数。
在${job_log_dir}/fate_flow_schedule.log 输出日志1
2[INFO] [2021-07-26 08:20:33,391] [90:139687957583680] - task_executor.py[line:56]: enter task process
[INFO] [2021-07-26 08:20:33,391] [90:139687957583680] - task_executor.py[line:57]: Namespace(component_name='upload_0', config='/data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/task_parameters.json', job_id='202107260820309976351', job_server='10.200.96.235:9380', party_id=0, role='local', run_ip='10.200.96.235', task_id='202107260820309976351_upload_0', task_version=0)根据参数解析的结果,调用 schedule_utils.get_job_dsl_parser() 生成dsl_parser,并配置各项参数。
设置job_log_dir 和 task_log_dir。
注:设置完目录后,task产生的所有日志,都是输出到task_log_dir 下了,和外层的fate_flow_schedule.log 分离了task_executor.py:初始化Tracker 和 TrackerClient,获取run_class_paths、run_class_package、run_class_name,调用 report_task_update_to_driver(task_info=task_info)
task_executor.py:执行report_task_update_to_driver()
先在 ${task_log_dir}/fate_flow_schedule.log 输出日志1
[INFO] [2021-07-26 08:20:33,395] [90:139687957583680] - task_executor.py[line:318]: report task 202107260820309976351_upload_0 0 local 0 to driver
然后调用ControllerClient.report_task
- control_client.py:执行report_task
先在${job_log_dir}/${role}/${party}/DEBUG.log 中打日志然后发起http请求,endpoint是report。1
[INFO] [2021-07-26 08:20:33,396] [90:139687957583680] - control_client.py[line:42]: request update job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0
流转流程同前文,调用链为 ControllerClient.report_task -> fate_flow_server 通过flask -> party_app.report_task() -> TaskController.update_task(task_info=task_info) -> TaskController.update_task_status(task_info=task_info)
在${job_log_dir}/fate_flow_audit.log 中的日志为1
2
3
4[INFO] [2021-07-26 08:20:33,396] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report
[INFO] [2021-07-26 08:20:33,452] [90:139687957583680] - api_utils.py[line:129]: {"retcode":103,"retmsg":"update task status failed"}
[INFO] [2021-07-26 08:20:33,452] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report {'retcode': 103, 'retmsg': 'update task status failed'}
因为status一致,无法update,故failed.(同五中所述)
在${job_log_dir}/fate_flow_schedule.log 中输出日志1
2
3
4[INFO] [2021-07-26 08:20:33,404] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[INFO] [2021-07-26 08:20:33,422] [1:140259119585024] - job_saver.py[line:84]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update successfully
[INFO] [2021-07-26 08:20:33,429] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:33,438] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'component_name': 'upload_0', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'run_ip': '10.200.96.235', 'run_pid': 90, 'party_status': 'running'}
在${job_log_dir}/peewee.log中输出日志为1
2
3
4
5
6
7
8[DEBUG] [2021-07-26 08:20:33,409] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((((`t1`.`f_job_id` = %s) AND (`t1`.`f_task_id` = %s)) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])
[DEBUG] [2021-07-26 08:20:33,416] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_task` SET `f_component_name` = %s, `f_run_ip` = %s, `f_run_pid` = %s WHERE (((((`t_task`.`f_job_id` = %s) AND (`t_task`.`f_task_id` = %s)) AND (`t_task`.`f_task_version` = %s)) AND (`t_task`.`f_role` = %s)) AND (`t_task`.`f_party_id` = %s))', ['upload_0', '10.200.96.235', 90, '202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])
[DEBUG] [2021-07-26 08:20:33,425] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351_upload_0', 0, 'local', '0'])
[DEBUG] [2021-07-26 08:20:33,433] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE (((((`t1`.`f_job_id` = %s) AND (`t1`.`f_task_id` = %s)) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', '202107260820309976351_upload_0', 0, 'local', '0'])
[DEBUG] [2021-07-26 08:20:33,441] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_job_id`, `t1`.`f_component_name`, `t1`.`f_task_id`, `t1`.`f_task_version`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_federated_mode`, `t1`.`f_federated_status_collect_type`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_run_on_this_party`, `t1`.`f_run_ip`, `t1`.`f_run_pid`, `t1`.`f_party_status`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_task` AS `t1` WHERE ((((`t1`.`f_task_id` = %s) AND (`t1`.`f_task_version` = %s)) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351_upload_0', 0, 'local', '0'])
taskexecutor.py:初始化环境变量,设置session
sess.initfederation 会调用federation/eggroll/_federation.py 的 __init,调用如下1
2[DEBUG] [2021-07-26 08:20:39,837] [90:139687957583680] - _federation.py[line:35]: [federation.eggroll]init federation: rp_session_id=202107260820309976351_upload_0_0_local_0, rs_session_id=202107260820309976351_upload_0_0, party=Party(role=local, party_id=0), proxy_endpoint=rollsite:9370
[DEBUG] [2021-07-26 08:20:39,837] [90:139687957583680] - _federation.py[line:45]: [federation.eggroll]init federation context donetask_executor.py:开始run task
先在${job_log_dir}/${role}/${party}/DEBUG.log 打出日志1
2
3[INFO] [2021-07-26 08:20:39,837] [90:139687957583680] - task_executor.py[line:156]: Run 202107260820309976351 upload_0 202107260820309976351_upload_0 local 0 task
[INFO] [2021-07-26 08:20:39,838] [90:139687957583680] - task_executor.py[line:157]: Component parameters on party {'UploadParam': {'file': '/data/projects/fate/jobs/202107260820309976351/fate_upload_tmp/breast_hetero_guest.csv', 'head': 1, 'id_delimiter': ',', 'partition': 1, 'namespace': 'cl', 'name': 'hetero_guest', 'storage_engine': '', 'storage_address': None, 'destroy': False}, 'initiator': {'role': 'local', 'party_id': 0}, 'job_parameters': {'job_type': 'train', 'work_mode': 1, 'backend': 0, 'computing_engine': 'EGGROLL', 'federation_engine': 'EGGROLL', 'storage_engine': 'EGGROLL', 'engines_address': {'computing': {'cores_per_node': 20, 'nodes': 1}, 'federation': {'host': 'rollsite', 'port': 9370}, 'storage': {'cores_per_node': 20, 'nodes': 1}}, 'federated_mode': 'MULTIPLE', 'task_parallelism': 1, 'computing_partitions': 4, 'federated_status_collect_type': 'PULL', 'model_id': 'local-0#model', 'model_version': '202107260820309976351', 'eggroll_run': {'eggroll.session.processors.per.node': 4}, 'spark_run': {}, 'rabbitmq_run': {}, 'adaptation_parameters': {'task_nodes': 1, 'task_cores_per_node': 4, 'task_memory_per_node': 0, 'request_task_cores': 4, 'if_initiator_baseline': False}}, 'role': {'local': [0]}, 'component_parameters': {'role': {'local': {'0': {'upload_0': {'name': 'hetero_guest', 'head': 1, 'file': '/data/projects/fate/jobs/202107260820309976351/fate_upload_tmp/breast_hetero_guest.csv', 'partition': 1, 'namespace': 'cl', 'destroy': False}}}}}, 'dsl_version': 2, 'local': {'role': 'local', 'party_id': 0}, 'CodePath': 'fate_flow/components/upload.py/Upload', 'module': 'Upload', 'output_data_name': None}
[INFO] [2021-07-26 08:20:39,838] [90:139687957583680] - task_executor.py[line:158]: Task input dsl {}然后获取 task_run_args,配置run_object,后执行
1
2profile.profile_start()
run_object.run(component_parameters_on_party, task_run_args)
这里的run_object 是 Upload,run_object.run 即调用Upload.run()
- upload.py:执行run
根据 component_parameters 获取参数。
在${job_log_dir}/${role}/${party}/DEBUG.log 打出日志根据参数,设置各变量。1
2[INFO] [2021-07-26 08:20:39,883] [90:139687957583680] - upload.py[line:41]: {'file': '/data/projects/fate/jobs/202107260820309976351/fate_upload_tmp/breast_hetero_guest.csv', 'head': 1, 'id_delimiter': ',', 'partition': 1, 'namespace': 'cl', 'name': 'hetero_guest', 'storage_engine': '', 'storage_address': None, 'destroy': False}
[INFO] [2021-07-26 08:20:39,883] [90:139687957583680] - upload.py[line:42]: {'job_parameters': <fate_flow.entity.types.RunParameters object at 0x7f0b81a98b38>}
build session1
2
3
4
5
6
7[DEBUG] [2021-07-26 08:20:39,883] [90:139687957583680] - pool.py[line:129]: No connection available in pool.
[DEBUG] [2021-07-26 08:20:39,889] [90:139687957583680] - pool.py[line:158]: Created new connection 139687331472832.
[DEBUG] [2021-07-26 08:20:39,891] [90:139687957583680] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_date`, `t1`.`f_update_date`, `t1`.`f_name`, `t1`.`f_namespace`, `t1`.`f_address`, `t1`.`f_engine`, `t1`.`f_type`, `t1`.`f_options`, `t1`.`f_partitions`, `t1`.`f_id_delimiter`, `t1`.`f_in_serialized`, `t1`.`f_have_head`, `t1`.`f_schema`, `t1`.`f_count`, `t1`.`f_part_of_data`, `t1`.`f_description`, `t1`.`f_create_time`, `t1`.`f_update_time` FROM `t_storage_table_meta` AS `t1` WHERE ((`t1`.`f_name` = %s) AND (`t1`.`f_namespace` = %s))', ['hetero_guest', 'cl'])
[DEBUG] [2021-07-26 08:20:39,894] [90:139687957583680] - pool.py[line:185]: Returning 139687331472832 to pool.
[DEBUG] [2021-07-26 08:20:39,966] [90:139687957583680] - peewee.py[line:2863]: ('INSERT INTO `t_session_record` (`f_session_id`, `f_create_date`, `f_update_time`, `f_update_date`, `f_engine_name`, `f_engine_type`, `f_engine_address`, `f_create_time`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)', ['202107260820309976351_upload_0_0_local_0_storage_5cbb3a88edea11ebbccccaf5cc2d708a', datetime.datetime(2021, 7, 26, 8, 20, 39), 1627287639962, datetime.datetime(2021, 7, 26, 8, 20, 39), 'STANDALONE', 'storage', '{}', 1627287639962])
[DEBUG] [2021-07-26 08:20:39,973] [90:139687957583680] - _session.py[line:144]: save session 202107260820309976351_upload_0_0_local_0_storage_5cbb3a88edea11ebbccccaf5cc2d708a record
[DEBUG] [2021-07-26 08:20:39,974] [90:139687957583680] - pool.py[line:185]: Returning 139687331472832 to pool.
1 | [DEBUG] [2021-07-26 08:20:39,975] [90:139687957583680] - peewee.py[line:2863]: ('DELETE FROM `t_session_record` WHERE (`t_session_record`.`f_session_id` = %s)', ['202107260820309976351_upload_0_0_local_0_storage_5cbb3a88edea11ebbccccaf5cc2d708a']) |
1 | if table: |
注:如果有destroy 参数,会调用table.destroy
更新upload的address,在${job_log_dir}/${role}/${party}/DEBUG.log 打出日志1
[INFO] [2021-07-26 08:20:41,836] [90:139687957583680] - upload.py[line:95]: upload to EGGROLL storage, address: {'cores_per_node': 20, 'nodes': 1, 'name': 'hetero_guest', 'namespace': 'cl', 'storage_type': 'LMDB'}
这个address 就是LMDB 的存储位置1
2
3
4
5
6address = storage.StorageTableMeta.create_address(storage_engine=storage_engine, address_dict=address_dict)
self.parameters["partitions"] = partitions
self.parameters["name"] = name
self.table = storage_session.create_table(address=address, **self.parameters)
data_table_count = self.save_data_table(job_id, name, namespace, head)
self.table.get_meta().update_metas(in_serialized=True)
如果是使用local 模式调试,在项目目录下会生成data目录。
如果是kubeFATE 部署,该数据位于 nodemanager 的 /data/projects/fate/eggroll/data/LMDB 目录下
依次为创建元数据,建表,保存数据,更新元数据
- upload.py:执行save_data_table ,
获取文件schma。
按最大文件块读取( lines = fin.readlines(self.MAX_BYTES)),保存至LMDB
进度计算如下:故而这里的进度和MAX_BYTES 有关。1
save_progress = lines_count/input_feature_count*100//1
- upload.py: 调用ControllerClient.update_job(job_info=job_info) 更新task 状态(主要是进度),control_client.update_job
流转流程同前文,调用链为 ControllerClient.update_job -> fate_flow_server 通过flask -> party_app.update_task() -> TaskController.update_task(task_info=task_info)
输出日志
${job_log_dir}/${role}/${party}/DEBUG.log1
[INFO] [2021-07-26 08:20:41,942] [90:139687957583680] - control_client.py[line:26]: request update job 202107260820309976351 on local 0
在${job_log_dir}/fate_flow_audit.log 中的日志为1
2
3
4[INFO] [2021-07-26 08:20:41,942] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/party/202107260820309976351/local/0/update
[INFO] [2021-07-26 08:20:41,966] [90:139687957583680] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}
[INFO] [2021-07-26 08:20:41,966] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/party/202107260820309976351/local/0/update {'retcode': 0, 'retmsg': 'success'}
在${job_log_dir}/fate_flow_schedule.log 中输出日志1
2[INFO] [2021-07-26 08:20:41,950] [1:140259119585024] - job_saver.py[line:61]: try to update job 202107260820309976351
[INFO] [2021-07-26 08:20:41,963] [1:140259119585024] - job_saver.py[line:64]: job 202107260820309976351 update successfully: {'progress': 100.0, 'job_id': '202107260820309976351', 'role': 'local', 'party_id': '0'}
在${job_log_dir}/peewee.log中输出日志为1
2[DEBUG] [2021-07-26 08:20:41,954] [1:140259119585024] - peewee.py[line:2863]: ('SELECT `t1`.`f_create_time`, `t1`.`f_create_date`, `t1`.`f_update_time`, `t1`.`f_update_date`, `t1`.`f_user_id`, `t1`.`f_job_id`, `t1`.`f_name`, `t1`.`f_description`, `t1`.`f_tag`, `t1`.`f_dsl`, `t1`.`f_runtime_conf`, `t1`.`f_runtime_conf_on_party`, `t1`.`f_train_runtime_conf`, `t1`.`f_roles`, `t1`.`f_work_mode`, `t1`.`f_initiator_role`, `t1`.`f_initiator_party_id`, `t1`.`f_status`, `t1`.`f_status_code`, `t1`.`f_role`, `t1`.`f_party_id`, `t1`.`f_is_initiator`, `t1`.`f_progress`, `t1`.`f_ready_signal`, `t1`.`f_ready_time`, `t1`.`f_cancel_signal`, `t1`.`f_cancel_time`, `t1`.`f_rerun_signal`, `t1`.`f_end_scheduling_updates`, `t1`.`f_engine_name`, `t1`.`f_engine_type`, `t1`.`f_cores`, `t1`.`f_memory`, `t1`.`f_remaining_cores`, `t1`.`f_remaining_memory`, `t1`.`f_resource_in_use`, `t1`.`f_apply_resource_time`, `t1`.`f_return_resource_time`, `t1`.`f_start_time`, `t1`.`f_start_date`, `t1`.`f_end_time`, `t1`.`f_end_date`, `t1`.`f_elapsed` FROM `t_job` AS `t1` WHERE (((`t1`.`f_job_id` = %s) AND (`t1`.`f_role` = %s)) AND (`t1`.`f_party_id` = %s))', ['202107260820309976351', 'local', '0'])
[DEBUG] [2021-07-26 08:20:41,960] [1:140259119585024] - peewee.py[line:2863]: ('UPDATE `t_job` SET `f_progress` = %s WHERE ((((`t_job`.`f_job_id` = %s) AND (`t_job`.`f_role` = %s)) AND (`t_job`.`f_party_id` = %s)) AND (`t_job`.`f_progress` <= %s))', [100, '202107260820309976351', 'local', '0', 100])
- upload.py: 回到9,继续保存文件,将data存入LMDB。当所有文件都存入完毕,更新metadata,更新tracker
在task_executor.py 中是故而这里self.tracker 调用的是TrackerClient1
run_object.set_tracker(tracker=tracker_client)
对应的分别是log_output_data_DEBUG.log_metric_data,set_metric_meta三个方法1
2
3
4
5
6
7
8
9
10self.tracker.log_output_data_info(data_name='upload',
table_namespace=dst_table_namespace,
table_name=dst_table_name)
self.tracker.log_metric_data(metric_namespace="upload",
metric_name="data_access",
metrics=[Metric("count", table_count)])
self.tracker.set_metric_meta(metric_namespace="upload",
metric_name="data_access",
metric_meta=MetricMeta(name='upload', metric_type='UPLOAD'))
输出日志${job_log_dir}/${role}/${party}/DEBUG.log发起http请求,对应的${job_log_dir}/fate_flow_audit.log 中的日志为1
2
3
4[INFO] [2021-07-26 08:20:42,704] [90:139687957583680] - tracker_client.py[line:127]: Request save job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 data upload info
[INFO] [2021-07-26 08:20:43,044] [1:140259119585024] - job_tracker.py[line:97]: save job 202107260820309976351 component upload_0 on local 0 upload data_access metric meta
[INFO] [2021-07-26 08:20:44,838] [90:139687957583680] - job_tracker.py[line:159]: task id 202107260820309976351_upload_0 output data table is none1
2
3
4
5
6
7
8
9
10
11
12
13
14
15[INFO] [2021-07-26 08:20:42,704] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/output_data_info/save
[INFO] [2021-07-26 08:20:43,009] [90:139687957583680] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}
[INFO] [2021-07-26 08:20:43,010] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/output_data_info/save {'retcode': 0, 'retmsg': 'success'}
[INFO] [2021-07-26 08:20:43,011] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_data/save
[INFO] [2021-07-26 08:20:43,036] [90:139687957583680] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}
[INFO] [2021-07-26 08:20:43,036] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_data/save {'retcode': 0, 'retmsg': 'success'}
[INFO] [2021-07-26 08:20:43,037] [90:139687957583680] - api_utils.py[line:122]: remote http api request: http://10.200.96.235:9380/v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_meta/save
[INFO] [2021-07-26 08:20:43,056] [90:139687957583680] - api_utils.py[line:129]: {"retcode":0,"retmsg":"success"}
[INFO] [2021-07-26 08:20:43,056] [90:139687957583680] - api_utils.py[line:131]: remote http api response: /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_meta/save {'retcode': 0, 'retmsg': 'success'}
对应的调用链是 TrackerClient.log_output_data_info() -> fate_flow_server -> tracker_app -> job_tracker
在${job_log_dir}/fate_flow_schedule.log 中输出日志1
2
3
4[INFO] [2021-07-26 08:20:43,018] [1:140259119585024] - job_tracker.py[line:74]: save job 202107260820309976351 component upload_0 on local 0 upload data_access metric data
[INFO] [2021-07-26 08:20:43,044] [1:140259119585024] - job_tracker.py[line:97]: save job 202107260820309976351 component upload_0 on local 0 upload data_access metric meta
[INFO] [2021-07-26 08:20:44,838] [90:139687957583680] - job_tracker.py[line:159]: task id 202107260820309976351_upload_0 output data table is none
save完毕,返回table_count。
- upload.py: 打印完成日志,并清理临时文件,输出统计信息
${job_log_dir}/${role}/${party}/DEBUG.log1
2
3
4
5
6[INFO] [2021-07-26 08:20:44,822] [90:139687957583680] - upload.py[line:102]: ------------load data finish!-----------------
[INFO] [2021-07-26 08:20:44,822] [90:139687957583680] - upload.py[line:106]: remove tmp upload file
[INFO] [2021-07-26 08:20:44,823] [90:139687957583680] - upload.py[line:107]: /data/projects/fate/jobs/202107260820309976351/fate_upload_tmp
[INFO] [2021-07-26 08:20:44,823] [90:139687957583680] - upload.py[line:111]: file: /data/projects/fate/jobs/202107260820309976351/fate_upload_tmp/breast_hetero_guest.csv
[INFO] [2021-07-26 08:20:44,824] [90:139687957583680] - upload.py[line:112]: total data_count: 569
[INFO] [2021-07-26 08:20:44,824] [90:139687957583680] - upload.py[line:113]: table name: hetero_guest, table namespace: cl upload.py:回到8 ,执行profile.profile_ends()
profile.profile_ends() 会打出${task_log_dir}/PROFILING.log中的日志(分别收集到INFO,DEBUG中)
${job_log_dir}/${role}/${party}/DEBUG.log1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24[INFO] [2021-07-26 08:20:44,837] [90:139687957583680] - profile.py[line:249]:
Computing:
+----------+------------------------------------------+
| function | |
+----------+------------------------------------------+
| total | n=0, sum=0.0000, mean=0.0000, max=0.0000 |
+----------+------------------------------------------+
Federation:
+--------+------------------------------------------+
| get | |
+--------+------------------------------------------+
| remote | |
+--------+------------------------------------------+
| total | n=0, sum=0.0000, mean=0.0000, max=0.0000 |
+--------+------------------------------------------+
[DEBUG] [2021-07-26 08:20:44,838] [90:139687957583680] - profile.py[line:250]:
Detailed Computing:
+-------+------------------------------------------+
| stack | |
+-------+------------------------------------------+
| total | n=0, sum=0.0000, mean=0.0000, max=0.0000 |
+-------+------------------------------------------+task_executor.py:执行完毕之后,再save_data ,save_out_model,然后调用report_task_update_to_driver。同4、5。再输出统计信息
${job_log_dir}/${role}/${party}/DEBUG.log1
2
3
4
5
6[INFO] [2021-07-26 08:20:44,838] [90:139687957583680] - task_executor.py[line:318]: report task 202107260820309976351_upload_0 0 local 0 to driver
[INFO] [2021-07-26 08:20:44,838] [90:139687957583680] - control_client.py[line:42]: request update job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0
[INFO] [2021-07-26 08:20:44,938] [90:139687957583680] - task_executor.py[line:207]: task 202107260820309976351_upload_0 local 0 start time: 2021-07-26 08:20:33
[INFO] [2021-07-26 08:20:44,939] [90:139687957583680] - task_executor.py[line:209]: task 202107260820309976351_upload_0 local 0 end time: 2021-07-26 08:20:44
[INFO] [2021-07-26 08:20:44,939] [90:139687957583680] - task_executor.py[line:211]: task 202107260820309976351_upload_0 local 0 takes 11.447s
[INFO] [2021-07-26 08:20:44,939] [90:139687957583680] - task_executor.py[line:214]: Finish 202107260820309976351 upload_0 202107260820309976351_upload_0 0 local 0 task success
对应${job_log_dir}/fate_flow_schedule.log 中输出日志1
2
3
4[INFO] [2021-07-26 08:20:44,844] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[INFO] [2021-07-26 08:20:44,864] [1:140259119585024] - job_saver.py[line:84]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update successfully
[INFO] [2021-07-26 08:20:44,873] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:44,908] [1:140259119585024] - job_saver.py[line:74]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status successfully: {'job_id': '202107260820309976351', 'component_name': 'upload_0', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'run_ip': '10.200.96.235', 'run_pid': 90, 'party_status': 'success', 'end_time': 1627287644838, 'elapsed': 11447}
- task_executor.py:TaskExecutor.run_task()执行完毕,执行 TaskExecutor.report_task_update_to_driver(task_info=task_info)
${job_log_dir}/${role}/${party}/DEBUG.log1
2[INFO] [2021-07-26 08:20:44,940] [90:139687957583680] - task_executor.py[line:318]: report task 202107260820309976351_upload_0 0 local 0 to driver
[INFO] [2021-07-26 08:20:44,940] [90:139687957583680] - control_client.py[line:42]: request update job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0
对应${job_log_dir}/fate_flow_schedule.log 中输出日志1
2
3
4[INFO] [2021-07-26 08:20:44,947] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[WARNING] [2021-07-26 08:20:44,972] [1:140259119585024] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
[INFO] [2021-07-26 08:20:44,996] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:45,015] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'component_name': 'upload_0', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'run_ip': '10.200.96.235', 'run_pid': 90, 'party_status': 'success', 'end_time': 1627287644838, 'elapsed': 11447}
注:这里的日志会和dag_scheduler 轮询的日志混在一起。区别就是 这个部分有run_pid。
- 返回执行结果,对应1
至此,整个task 执行完毕。