0%
3.7k 字 21 分钟

FATE学习:跟着日志读源码(十二)dataio task excute阶段

综述

参考FATE 的flowchart,常规任务开始的第一个组件,就是dataio。
同前文,在${job_log_dir}/fate_flow_schedule.log 中输出如下命令后,开始通过task_executor 执行dataio。

1
2
3
4
5
[INFO] [2021-06-04 14:42:02,995] [1:140456603948800] - job_utils.py[line:310]: start process command: /opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202106041441554669404 -n dataio_0 -t 202106041441554669404_dataio_0 -v 0 -r guest -p 9989 -c /data/projects/fate/jobs/202106041441554669404/guest/9989/dataio_0/202106041441554669404_dataio_0/0/task_parameters.json --run_ip 10.200.224.59 --job_server 10.200.224.59:9380
[INFO] [2021-06-04 14:42:03,005] [1:140456603948800] - job_utils.py[line:333]: start process command: /opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202106041441554669404 -n dataio_0 -t 202106041441554669404_dataio_0 -v 0 -r guest -p 9989 -c /data/projects/fate/jobs/202106041441554669404/guest/9989/dataio_0/202106041441554669404_dataio_0/0/task_parameters.json --run_ip 10.200.224.59 --job_server 10.200.224.59:9380 successfully, pid is 1657

[INFO] [2021-06-04 14:42:03,972] [1657:139893886592832] - task_executor.py[line:56]: enter task process
[INFO] [2021-06-04 14:42:03,972] [1657:139893886592832] - task_executor.py[line:57]: Namespace(component_name='dataio_0', config='/data/projects/fate/jobs/202106041441554669404/guest/9989/dataio_0/202106041441554669404_dataio_0/0/task_parameters.json', job_id='202106041441554669404', job_server='10.200.224.59:9380', party_id=9989, role='guest', run_ip='10.200.224.59', task_id='202106041441554669404_dataio_0', task_version=0)

具体的执行日志,会输出在dataio_0/DEBUG.log中。

执行细节

  1. task_executor.py: 执行python命令,前期参数准备等类似upload,但在init federation部分,两者不同,upload是loacl的,data_io 是多方协同的
    1
    2
    3
    4
    5
    6
    upload的
    [DEBUG] [2021-07-26 08:20:39,837] [90:139687957583680] - _federation.py[line:35]: [federation.eggroll]init federation: rp_session_id=202107260820309976351_upload_0_0_local_0, rs_session_id=202107260820309976351_upload_0_0, party=Party(role=local, party_id=0), proxy_endpoint=rollsite:9370


    data_io的
    [DEBUG] [2021-08-20 10:01:15,929] [2673:140340420417344] - _federation.py[line:35]: [federation.eggroll]init federation: rp_session_id=202108201000594960226_dataio_0_0_guest_9989, rs_session_id=202108201000594960226_dataio_0_0, party=Party(role=guest, party_id=9989), proxy_endpoint=rollsite:9370
  2. task_executor.py:准备好相关参数之后,通过run,开始具体的任务执行

    1
    run_object.run(component_parameters_on_party, task_run_args)

    这里需要注意的是,DataIO 类(源码:python/federatedml/util/data_io.py)没有重写run方法,故这里调用的是基类ModelBase的run方法(源码:python/federatedml/model_base.py)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    def run(self, component_parameters=None, args=None):
    self._init_runtime_parameters(component_parameters)
    self.component_properties.parse_dsl_args(args)

    running_funcs = self.component_properties.extract_running_rules(args, self)
    LOGGER.debug(f"running_funcs: {running_funcs.todo_func_list}")
    saved_result = []
    for func, params, save_result, use_previews in running_funcs:
    # for func, params in zip(todo_func_list, todo_func_params):
    if use_previews:
    if params:
    real_param = [saved_result, params]
    else:
    real_param = saved_result
    LOGGER.debug("func: {}".format(func))
    this_data_output = func(*real_param)
    saved_result = []
    else:
    this_data_output = func(*params)

    if save_result:
    saved_result.append(this_data_output)

    if len(saved_result) == 1:
    self.data_output = saved_result[0]
    # LOGGER.debug("One data: {}".format(self.data_output.first()[1].features))
    LOGGER.debug("saved_result is : {}, data_output: {}".format(saved_result, self.data_output))
    # self.check_consistency()
    self.save_summary()
  3. model_base.py:执行_init_runtime_parameters 初始化runtime_parameters,实际是调用component_properties.py的parse_component_param,输出如下日志

    1
    2
    [DEBUG] [2021-08-20 10:01:16,473] [2673:140340420417344] - component_properties.py[line:73]: {'DataIOParam': {'input_format': 'dense', 'delimitor': ',', 'data_type': 'float64', 'exclusive_data_type': None, 'tag_with_value': False, 'tag_value_delimitor': ':', 'missing_fill': True, 'default_value': 0, 'missing_fill_method': None, 'missing_impute': None, 'outlier_replace': True, 'outlier_replace_method': None, 'outlier_impute': None, 'outlier_replace_value': 0, 'with_label': True, 'label_name': 'y', 'label_type': 'int', 'output_format': 'dense', 'need_run': True}, 'initiator': {'role': 'guest', 'party_id': 9989}, 'job_parameters': {'job_type': 'train', 'work_mode': 1, 'backend': 0, 'computing_engine': 'EGGROLL', 'federation_engine': 'EGGROLL', 'storage_engine': 'EGGROLL', 'engines_address': {'computing': {'cores_per_node': 20, 'nodes': 1}, 'federation': {'host': 'rollsite', 'port': 9370}, 'storage': {'cores_per_node': 20, 'nodes': 1}}, 'federated_mode': 'MULTIPLE', 'task_parallelism': 1, 'computing_partitions': 4, 'federated_status_collect_type': 'PULL', 'model_id': 'arbiter-10000#guest-9989#host-10000#model', 'model_version': '202108201000594960226', 'eggroll_run': {'eggroll.session.processors.per.node': 4}, 'spark_run': {}, 'rabbitmq_run': {}, 'adaptation_parameters': {'task_nodes': 1, 'task_cores_per_node': 4, 'task_memory_per_node': 0, 'request_task_cores': 4, 'if_initiator_baseline': False}}, 'role': {'guest': [9989], 'host': [10000], 'arbiter': [10000]}, 'config': '/data/projects/fate/python/cl/fc/config/20210819/train_20210819_cgame2122_part.json', 'dsl': '/data/projects/fate/python/cl/fc/env/pattern/train_job_dsl.json', 'function': 'submit_job', 'local': {'role': 'guest', 'party_id': 9989}, 'CodePath': 'federatedml/util/data_io.py/DataIO', 'module': 'DataIO', 'output_data_name': ['train']}
    [DEBUG] [2021-08-20 10:01:16,474] [2673:140340420417344] - component_properties.py[line:80]: need_run: True, need_cv: False
  4. model_base.py:component_properties.parse_dsl_args,调用component_properties.py的 parse_dsl_args解析dsl 参数。
    4.1 先是判断是否有model或isometric_model参数,两者的说明,详见FATE学习:配置文件解析及V1/V2版本对比
    4.2 解析有哪些类型的datatype。(可选项有”train_data”, “eval_data”, “validate_data”, “test_data”四个)

1
2
3
4
5
[DEBUG] [2021-08-20 10:01:16,474] [2673:140340420417344] - component_properties.py[line:102]: parse_dsl_args data_sets: {'args': {'data': [<fate_arch.computing.eggroll._table.Table object at 0x7fa373fdb080>]}}
[DEBUG] [2021-08-20 10:01:16,474] [2673:140340420417344] - component_properties.py[line:112]: [Data Parser], has_train_data: False
[DEBUG] [2021-08-20 10:01:16,474] [2673:140340420417344] - component_properties.py[line:112]: [Data Parser], has_eval_data: False
[DEBUG] [2021-08-20 10:01:16,475] [2673:140340420417344] - component_properties.py[line:112]: [Data Parser], has_validate_data: False
[DEBUG] [2021-08-20 10:01:16,475] [2673:140340420417344] - component_properties.py[line:112]: [Data Parser], has_test_data: False

只要 四种类型的data,有一种没有,has_normal_input_data 就为ture,打印如下日志

1
[DEBUG] [2021-08-20 10:01:16,475] [2673:140340420417344] - component_properties.py[line:118]: [Data Parser], has_normal_data: True

这里需要注意,如过有eval_data,则必须有validate_data 或 test_data,否则抛出异常

1
2
3
4
if self.has_eval_data:
if self.has_validate_data or self.has_test_data:
raise DSLConfigError("eval_data input should not be configured simultaneously"
" with validate_data or test_data")

  1. model_base.py:调用 ComponentProperties.extract_running_rules ,提取运行规则。
  2. component_properties.py:先调用extract_input_data,提取输入数据,输出如下日志
    1
    2
    3
    4
    [DEBUG] [2021-08-20 10:01:16,475] [2673:140340420417344] - component_properties.py[line:179]: Input data_sets: {'args': {'data': [<fate_arch.computing.eggroll._table.Table object at 0x7fa373fdb080>]}}

    [DEBUG] [2021-08-20 10:01:16,475] [2673:140340420417344] - component_properties.py[line:189]: data_dict: {'data': [<fate_arch.computing.eggroll._table.Table object at 0x7fa373fdb080>]}
    [DEBUG] [2021-08-20 10:01:16,475] [2673:140340420417344] - component_properties.py[line:192]: data_list: <fate_arch.computing.eggroll._table.Table object at 0x7fa373fdb080>

这里紧接着,是打出了如下日志

1
2
3
[DEBUG] [2021-08-20 10:01:16,479] [2673:140340420417344] - profile.py[line:84]: [computing#41d092c3c4]function_stack: [component_properties.py:225]extract_input_data <-[component_properties.py:238]extract_running_rules <-[model_base.py:87]run
[DEBUG] [2021-08-20 10:01:16,480] [2673:140340420417344] - profile.py[line:87]: [computing#41d092c3c4]start
[DEBUG] [2021-08-20 10:01:16,493] [2673:140340420417344] - profile.py[line:93]: [computing#41d092c3c4]done, elapse: 0.013397216796875, function: count(self: Table(partition=4)) -> int

是由于
1
self.input_data_count = data_table.count()

的count,经历了如下调用链 data_table.count() -> fate_arch/computing/eggroll/_table.py.count() -> fate_arch/common/profile.py.computing_profile() -> _ComputingTimer()

校验完毕,打出如下日志:

1
[DEBUG] [2021-08-20 10:01:16,493] [2673:140340420417344] - component_properties.py[line:231]: train_data: None, validate_data: None, test_data: None, data: {'args.data': <fate_arch.computing.eggroll._table.Table object at 0x7fa373fdb080>}

  1. component_properties.py:确定需要执行的function,添加到running_funcs

    1
    2
    3
    running_funcs.add_func(model.extract_data, [data], save_result=True)
    running_funcs.add_func(model.set_flowid, ['transform'])
    running_funcs.add_func(model.transform, [], use_previews=True, save_result=True)
  2. model_base.py:ComponentProperties.extract_running_rules执行完毕,打印出7中添加的function

    1
    [DEBUG] [2021-08-20 10:01:16,493] [2673:140340420417344] - model_base.py[line:88]: running_funcs: [<function ModelBase.extract_data at 0x7fa374011488>, <bound method ModelBase.set_flowid of <federatedml.util.data_io.DataIO object at 0x7fa37627c860>>, <bound method DataIO.fit of <federatedml.util.data_io.DataIO object at 0x7fa37627c860>>]

    可以看到,一共有3个,model.extract_data,model.set_flowid,model.transform

  3. model_base.py:遍历执行running_funcs中的function
    9.1 先执行model.extract_data,use_previews=False 执行执行func ,也就是ModelBase.extract_data() 打印如下日志

    1
    [DEBUG] [2021-08-20 10:01:16,493] [2673:140340420417344] - model_base.py[line:340]: In extract_data, data input: {'args.data': <fate_arch.computing.eggroll._table.Table object at 0x7fa373fdb080>}

9.2 执行model.set_flowid ,set flowid
9.3 执行model.transform 也就是 DataIO.fit
先打印日志,

1
[DEBUG] [2021-08-20 10:01:16,493] [2673:140340420417344] - model_base.py[line:97]: func: <bound method DataIO.fit of <federatedml.util.data_io.DataIO object at 0x7fa37627c860>>

然后调用DataIO.fit

  1. data_io.py:执行fit()
    dataio 的成员变量reader 有三个可选类型。默认是DenseFeatureReader
  • DenseFeatureReader:稠密数据。
  • SparseFeatureReader: libsvm 格式数据。
  • SparseTagReader:tag data 格式数据。
    调用 DenseFeatureReader.read_data开始读取数据,
  1. data_io.py:DenseFeatureReader 执行read_data()
    11.1 先打印日志
    1
    [INFO] [2021-08-20 10:01:16,493] [2673:140340420417344] - data_io.py[line:118]: start to read dense data and change data to instance
    11.2 调用abnormal_detection.empty_table_detection,检验data_instance 不为空
    1
    2
    3
    4
    def empty_table_detection(data_instances):
    num_data = data_instances.count()
    if num_data == 0:
    raise ValueError(f"Count of data_instance is 0: {data_instances}")
    对应日志:
    1
    2
    3
    [DEBUG] [2021-08-20 10:01:16,494] [2673:140340420417344] - profile.py[line:84]: [computing#4a2ab9dfb5]function_stack: [abnormal_detection.py:27]empty_table_detection <-[data_io.py:120]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
    [DEBUG] [2021-08-20 10:01:16,494] [2673:140340420417344] - profile.py[line:87]: [computing#4a2ab9dfb5]start
    [DEBUG] [2021-08-20 10:01:16,512] [2673:140340420417344] - profile.py[line:93]: [computing#4a2ab9dfb5]done, elapse: 0.017865896224975586, function: count(self: Table(partition=4)) -> int

如果是transform 模式,直接获取文件头。如果是fit模式,则会进行相关适配。

11.3 调用 generate_header,生成头。
输出如下日志:

1
2
[DEBUG] [2021-08-20 10:01:16,512] [2673:140340420417344] - data_io.py[line:75]: header is x,y
[DEBUG] [2021-08-20 10:01:16,512] [2673:140340420417344] - data_io.py[line:76]: sid_name is imei

11.4 idx不为空的情况下,获取shape,并执行mapValues

1
2
3
data_shape = data_overview.get_data_shape(input_data)
input_data_features = input_data.mapValues
input_data_labels = input_data.mapValues

调用链为get_data_shape -> python/fate_arch/computing/eggroll/_table.py对应方法(first,mapValues),有注解 @computing_profile,产生如下日志
1
2
3
4
5
6
7
8
9
[DEBUG] [2021-08-20 10:01:16,513] [2673:140340420417344] - profile.py[line:84]: [computing#19347ff993]function_stack: [data_overview.py:92]get_data_shape <-[data_io.py:131]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:16,513] [2673:140340420417344] - profile.py[line:87]: [computing#19347ff993]start
[DEBUG] [2021-08-20 10:01:16,546] [2673:140340420417344] - profile.py[line:93]: [computing#19347ff993]done, elapse: 0.03276538848876953, function: first(self: Table(partition=4)) -> tuple
[DEBUG] [2021-08-20 10:01:16,547] [2673:140340420417344] - profile.py[line:84]: [computing#5ce83a9f4d]function_stack: [data_io.py:136]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:16,547] [2673:140340420417344] - profile.py[line:87]: [computing#5ce83a9f4d]start
[DEBUG] [2021-08-20 10:01:16,725] [2673:140340420417344] - profile.py[line:93]: [computing#5ce83a9f4d]done, elapse: 0.17815899848937988, function: mapValues(self: Table(partition=4), func: function) -> Table(partition=4)
[DEBUG] [2021-08-20 10:01:16,726] [2673:140340420417344] - profile.py[line:84]: [computing#68bd39b935]function_stack: [data_io.py:139]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:16,726] [2673:140340420417344] - profile.py[line:87]: [computing#68bd39b935]start
[DEBUG] [2021-08-20 10:01:16,770] [2673:140340420417344] - profile.py[line:93]: [computing#68bd39b935]done, elapse: 0.0438079833984375, function: mapValues(self: Table(partition=4), func: function) -> Table(partition=4)

11.5 调用fit() 转换数据。
先调用get_schema

1
schema = make_schema(self.header, self.sid_name, self.label_name)

输出如下日志
1
2
3
4
5

[DEBUG] [2021-08-20 10:01:16,770] [2673:140340420417344] - abnormal_detection.py[line:81]: schema is {'header': ['x'], 'sid_name': 'imei', 'label_name': 'y'}
[DEBUG] [2021-08-20 10:01:16,770] [2673:140340420417344] - abnormal_detection.py[line:85]: header is ['x']
[DEBUG] [2021-08-20 10:01:16,771] [2673:140340420417344] - abnormal_detection.py[line:95]: sid_name is imei
[DEBUG] [2021-08-20 10:01:16,771] [2673:140340420417344] - abnormal_detection.py[line:100]: label_name is y

11.6 然后调用fill_missing_value 和 replace_outlier_value 进行缺失值和异常值处理。对应日志如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

[INFO] [2021-08-20 10:01:16,842] [2673:140340420417344] - imputer.py[line:159]: finish replace missing value with replace value 0, replace method is:None
[DEBUG] [2021-08-20 10:01:16,844] [2673:140340420417344] - profile.py[line:84]: [computing#89322b8ead]function_stack: [data_overview.py:92]get_data_shape <-[imputer.py:160]__fit_replace <-[imputer.py:242]fit <-[data_io.py:187]fill_missing_value <-[data_io.py:158]fit <-[data_io.py:146]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:16,844] [2673:140340420417344] - profile.py[line:87]: [computing#89322b8ead]start
[DEBUG] [2021-08-20 10:01:16,887] [2673:140340420417344] - profile.py[line:93]: [computing#89322b8ead]done, elapse: 0.04289054870605469, function: first(self: Table(partition=4)) -> tuple
[DEBUG] [2021-08-20 10:01:16,888] [2673:140340420417344] - profile.py[line:84]: [computing#74840b2b1e]function_stack: [imputer.py:207]__get_impute_rate_from_replace_data <-[imputer.py:244]fit <-[data_io.py:187]fill_missing_value <-[data_io.py:158]fit <-[data_io.py:146]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:16,889] [2673:140340420417344] - profile.py[line:87]: [computing#74840b2b1e]start
[DEBUG] [2021-08-20 10:01:16,937] [2673:140340420417344] - profile.py[line:93]: [computing#74840b2b1e]done, elapse: 0.04834342002868652, function: applyPartitions(self: Table(partition=4), func: function) -> Table(partition=4)
[DEBUG] [2021-08-20 10:01:16,939] [2673:140340420417344] - profile.py[line:87]: [computing#74840b2b1e]start
[DEBUG] [2021-08-20 10:01:16,956] [2673:140340420417344] - profile.py[line:93]: [computing#74840b2b1e]done, elapse: 0.01778411865234375, function: reduce(self: Table(partition=4), func: function) -> ndarray
[DEBUG] [2021-08-20 10:01:16,958] [2673:140340420417344] - profile.py[line:84]: [computing#8129e5f4a8]function_stack: [imputer.py:245]fit <-[data_io.py:187]fill_missing_value <-[data_io.py:158]fit <-[data_io.py:146]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:16,958] [2673:140340420417344] - profile.py[line:87]: [computing#8129e5f4a8]start
[DEBUG] [2021-08-20 10:01:17,001] [2673:140340420417344] - profile.py[line:93]: [computing#8129e5f4a8]done, elapse: 0.04316139221191406, function: mapValues(self: Table(partition=4), func: function) -> Table(partition=4)
[DEBUG] [2021-08-20 10:01:17,003] [2673:140340420417344] - profile.py[line:84]: [computing#f496b1cb7d]function_stack: [imputer.py:157]__fit_replace <-[imputer.py:242]fit <-[data_io.py:209]replace_outlier_value <-[data_io.py:159]fit <-[data_io.py:146]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:17,003] [2673:140340420417344] - profile.py[line:87]: [computing#f496b1cb7d]start
[DEBUG] [2021-08-20 10:01:17,048] [2673:140340420417344] - profile.py[line:93]: [computing#f496b1cb7d]done, elapse: 0.04530787467956543, function: mapValues(self: Table(partition=4), func: partial) -> Table(partition=4)


[INFO] [2021-08-20 10:01:17,049] [2673:140340420417344] - imputer.py[line:159]: finish replace missing value with replace value 0, replace method is:None
[DEBUG] [2021-08-20 10:01:17,051] [2673:140340420417344] - profile.py[line:84]: [computing#61b779c7fc]function_stack: [data_overview.py:92]get_data_shape <-[imputer.py:160]__fit_replace <-[imputer.py:242]fit <-[data_io.py:209]replace_outlier_value <-[data_io.py:159]fit <-[data_io.py:146]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:17,051] [2673:140340420417344] - profile.py[line:87]: [computing#61b779c7fc]start
[DEBUG] [2021-08-20 10:01:17,093] [2673:140340420417344] - profile.py[line:93]: [computing#61b779c7fc]done, elapse: 0.04207563400268555, function: first(self: Table(partition=4)) -> tuple
[DEBUG] [2021-08-20 10:01:17,094] [2673:140340420417344] - profile.py[line:84]: [computing#cdc7da1b93]function_stack: [imputer.py:207]__get_impute_rate_from_replace_data <-[imputer.py:244]fit <-[data_io.py:209]replace_outlier_value <-[data_io.py:159]fit <-[data_io.py:146]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:17,095] [2673:140340420417344] - profile.py[line:87]: [computing#cdc7da1b93]start
[DEBUG] [2021-08-20 10:01:17,130] [2673:140340420417344] - profile.py[line:93]: [computing#cdc7da1b93]done, elapse: 0.0355229377746582, function: applyPartitions(self: Table(partition=4), func: function) -> Table(partition=4)
[DEBUG] [2021-08-20 10:01:17,132] [2673:140340420417344] - profile.py[line:87]: [computing#cdc7da1b93]start
[DEBUG] [2021-08-20 10:01:17,149] [2673:140340420417344] - profile.py[line:93]: [computing#cdc7da1b93]done, elapse: 0.017266035079956055, function: reduce(self: Table(partition=4), func: function) -> ndarray
[DEBUG] [2021-08-20 10:01:17,151] [2673:140340420417344] - profile.py[line:84]: [computing#f50f12b817]function_stack: [imputer.py:245]fit <-[data_io.py:209]replace_outlier_value <-[data_io.py:159]fit <-[data_io.py:146]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:17,151] [2673:140340420417344] - profile.py[line:87]: [computing#f50f12b817]start
[DEBUG] [2021-08-20 10:01:17,192] [2673:140340420417344] - profile.py[line:93]: [computing#f50f12b817]done, elapse: 0.04134106636047363, function: mapValues(self: Table(partition=4), func: function) -> Table(partition=4)
[DEBUG] [2021-08-20 10:01:17,193] [2673:140340420417344] - profile.py[line:84]: [computing#9e38a6d0d2]function_stack: [data_io.py:224]gen_data_instance <-[data_io.py:161]fit <-[data_io.py:146]read_data <-[data_io.py:885]fit <-[model_base.py:98]run
[DEBUG] [2021-08-20 10:01:17,193] [2673:140340420417344] - profile.py[line:87]: [computing#9e38a6d0d2]start
[DEBUG] [2021-08-20 10:01:17,238] [2673:140340420417344] - profile.py[line:93]: [computing#9e38a6d0d2]done, elapse: 0.04482388496398926, function: join(self: Table(partition=4), other: Table(partition=4), func: function) -> Table(partition=4)

11.7 最终是调用gen_data_instance 生成对象实例返回。

  1. data_io.py:11的结果返回后,调用get_summary 获取统计值。
  2. model_base.py:running_funcs中的function遍历执行完毕,对应9,打印如下日志
    1
    [DEBUG] [2021-08-20 10:01:17,239] [2673:140340420417344] - model_base.py[line:109]: saved_result is : [<fate_arch.computing.eggroll._table.Table object at 0x7fa373f3b8d0>], data_output: <fate_arch.computing.eggroll._table.Table object at 0x7fa373f3b8d0>
  3. model_base.py: 调用save_summary() 保存任务摘要

    1
    [INFO] [2021-08-20 10:01:17,239] [2673:140340420417344] - tracker_client.py[line:173]: Request save job 202108201000594960226 task 202108201000594960226_dataio_0 0 on guest 9989 component summary
  4. task_executor.py: profile.profile_ends() 打印任务摘要

  5. task_executor.py:保存模型,输出日志如下
    1
    2
    3
    [INFO] [2021-08-20 10:01:17,597] [2673:140340420417344] - pipelined_model.py[line:73]: Save dataio_0 dataio DataIOMeta buffer
    [INFO] [2021-08-20 10:01:17,598] [2673:140340420417344] - pipelined_model.py[line:73]: Save dataio_0 dataio DataIOParam buffer
    [INFO] [2021-08-20 10:01:17,602] [2673:140340420417344] - pipelined_model.py[line:78]: Save dataio_0 dataio successfully