0%
1.5k 字 8 分钟

FATE学习:跟着日志读源码(十一)upload 任务过程中产生的请求

综述

upload 任务的http请求,可以分为两部分。
一部分是submit -> job finish 这个流程中产生的。
另一部分是polling 的过程中,轮询产生的请求(只轮询后收集相关信息,不实际执行)

执行细节

以下的日志,是一个upload执行后,容器中完整的日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
10.200.96.235 - - [26/Jul/2021 08:20:31] "POST /v1/party/202107260820309976351/local/0/create HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:31] "POST /v1/data/upload?%7B%22file%22:%20%22/data/projects/fate/examples/data/breast_hetero_guest.csv%22,%20%22head%22:%201,%20%22partition%22:%201,%20%22work_mode%22:%201,%20%22table_name%22:%20%22hetero_guest%22,%20%22namespace%22:%20%22cl%22,%20%22config%22:%20%22/data/projects/fate/cl/upload_guest.json%22,%20%22function%22:%20%22upload%22%7D HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/resource/apply HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/start HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/running HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/start HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:33] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:34] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:36] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:38] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:40] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:41] "POST /v1/party/202107260820309976351/local/0/update HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:42] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/output_data_info/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_data/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_meta/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:44] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:45] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:45] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
static conf path: /data/projects/fate/eggroll/conf/eggroll.properties
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/model HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/status/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/clean HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/clean HTTP/1.1" 200 -

其中endpoint为 collect 的是第二部分产生的日志,可以发现除最后一条记录外,时间间隔和轮询时间间隔是一致的。

其余则是第一部分的日志。

提交任务http 请求

按照不同的请求,依次说明其功能和函数调用链

  1. data/upload:
    提交job
    fate_flow_client.py -> fate_flow_server.py -> data_access_app_manager.py -> DAGScheduler.submit
  1. party////create:
    创建job (接上文)
    DAGScheduler.submit -> FederatedScheduler.create_job -> fate_flow_server.py -> party_app.py -> JobController.create_job
  1. party////resource/apply:
    为job申请资源
    DAGScheduler.schedule_waiting_jobs -> FederatedScheduler.resource_for_job -> fate_flow_server.py -> party_app.py -> ResourceManager.apply_for_job_resource
  1. party////start:
    start job
    DAGScheduler.schedule_waiting_jobs -> DAGScheduler.start_job -> FederatedScheduler.start_job(job=job) -> fate_flow_server.py -> party_app.py -> JobController.start_job

如下5-7 依次start task 并更新job状态

  1. party///////status/running:
    更新task状态
    DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.start_task -> FederatedScheduler.sync_task_status -> fate_flow_server.py -> party_app.py -> JobController.update_job_status

  2. party///////start:
    紧跟上述 FederatedScheduler.sync_task_status 操作之后,start task。
    DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.start_task -> FederatedScheduler.start_task -> fate_flow_server.py -> party_app.py -> TaskController.start_task

  3. party///////report:
    反馈状态
    TaskExecutor.run_task -> TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task

如下8 -11都是是在upload task执行过程中产生的请求

  1. party////update:
    更新job状态
    Upload.save_data_table -> ControllerClient.update_job -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> JobController.update_job

  2. tracker///////output_data_info/save:
    保存output_data_info
    Upload.save_data_table -> TrackerClient.log_output_data_info -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.insert_output_data_info_into_db

  3. tracker///////metric_data/save:
    保存指标
    Upload.save_data_table -> TrackerClient.log_metric_data -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.save_metric_data

  4. tracker///////metric_meta/save:
    保存指标元数据
    Upload.save_data_table -> TrackerClient.log_metric_meta -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.save_metric_meta

回到 task_executor,在finally 中执行 ,更新task状态

  1. party///////report:
    TaskExecutor.run_task -> TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task

TaskExecutor.run_task() 执行完毕,执行TaskExecutor.report_task_update_to_driver()

  1. party//////
    report:TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task

Tips:这一次schedule_running_job之后,已经是success 了

  1. party///////status/success:
    DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> FederatedScheduler.sync_task_status -> fate_flow_server.py -> party_app.py -> TaskController.update_task_status

Tips:因为已经success 进入下一环节

  1. party///////stop/success:
    DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> FederatedScheduler.stop_task(接在上一步sync_task_status 之后) -> fate_flow_server.py -> party_app.py -> TaskController.stop_task

  2. party////model
    DAGScheduler.schedule_running_job -> FederatedScheduler.save_pipelined_model -> fate_flow_server.py -> party_app.py -> JobController.save_pipelined_model

  3. party////status/success
    DAGScheduler.schedule_running_job -> FederatedScheduler.sync_job_status -> fate_flow_server.py -> party_app.py -> JobController.update_job_status

  4. party////stop/success
    DAGScheduler.schedule_running_job -> DAGScheduler.finish -> DAGScheduler.stop_job -> FederatedScheduler.stop_job -> fate_flow_server.py -> party_app.py -> JobController.stop_jobs

  5. party////clean
    DAGScheduler.schedule_running_job -> DAGScheduler.finish -> FederatedScheduler.clean_job -> fate_flow_server.py -> party_app.py -> JobController.clean_job

再run 一次 清理干净 参见FATE学习:跟着日志读源码(九)upload任务job finsih阶段

  1. party////stop/success:DAGScheduler.schedule_running_job -> DAGScheduler.finish -> DAGScheduler.stop_job -> FederatedScheduler.stop_job -> fate_flow_server.py -> party_app.py -> JobController.stop_jobs

  2. party////clean:DAGScheduler.schedule_running_job -> DAGScheduler.finish -> FederatedScheduler.clean_job -> fate_flow_server.py -> party_app.py -> JobController.clean_job

对于轮询部分

party///////collect:
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.collect_task_of_all_party -> FederatedScheduler.collect_task -> party_app.py -> TaskController.collect_task