0%
3.7k 字 21 分钟

综述

参考FATE 的flowchart,常规任务开始的第一个组件,就是dataio。
同前文,在${job_log_dir}/fate_flow_schedule.log 中输出如下命令后,开始通过task_executor 执行dataio。

1
2
3
4
5
[INFO] [2021-06-04 14:42:02,995] [1:140456603948800] - job_utils.py[line:310]: start process command: /opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202106041441554669404 -n dataio_0 -t 202106041441554669404_dataio_0 -v 0 -r guest -p 9989 -c /data/projects/fate/jobs/202106041441554669404/guest/9989/dataio_0/202106041441554669404_dataio_0/0/task_parameters.json --run_ip 10.200.224.59 --job_server 10.200.224.59:9380
[INFO] [2021-06-04 14:42:03,005] [1:140456603948800] - job_utils.py[line:333]: start process command: /opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202106041441554669404 -n dataio_0 -t 202106041441554669404_dataio_0 -v 0 -r guest -p 9989 -c /data/projects/fate/jobs/202106041441554669404/guest/9989/dataio_0/202106041441554669404_dataio_0/0/task_parameters.json --run_ip 10.200.224.59 --job_server 10.200.224.59:9380 successfully, pid is 1657

[INFO] [2021-06-04 14:42:03,972] [1657:139893886592832] - task_executor.py[line:56]: enter task process
[INFO] [2021-06-04 14:42:03,972] [1657:139893886592832] - task_executor.py[line:57]: Namespace(component_name='dataio_0', config='/data/projects/fate/jobs/202106041441554669404/guest/9989/dataio_0/202106041441554669404_dataio_0/0/task_parameters.json', job_id='202106041441554669404', job_server='10.200.224.59:9380', party_id=9989, role='guest', run_ip='10.200.224.59', task_id='202106041441554669404_dataio_0', task_version=0)

具体的执行日志,会输出在dataio_0/DEBUG.log中。

Read more »
1.5k 字 8 分钟

综述

upload 任务的http请求,可以分为两部分。
一部分是submit -> job finish 这个流程中产生的。
另一部分是polling 的过程中,轮询产生的请求(只轮询后收集相关信息,不实际执行)

执行细节

以下的日志,是一个upload执行后,容器中完整的日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
10.200.96.235 - - [26/Jul/2021 08:20:31] "POST /v1/party/202107260820309976351/local/0/create HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:31] "POST /v1/data/upload?%7B%22file%22:%20%22/data/projects/fate/examples/data/breast_hetero_guest.csv%22,%20%22head%22:%201,%20%22partition%22:%201,%20%22work_mode%22:%201,%20%22table_name%22:%20%22hetero_guest%22,%20%22namespace%22:%20%22cl%22,%20%22config%22:%20%22/data/projects/fate/cl/upload_guest.json%22,%20%22function%22:%20%22upload%22%7D HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/resource/apply HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/start HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/running HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/start HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:33] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:34] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:36] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:38] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:40] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:41] "POST /v1/party/202107260820309976351/local/0/update HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:42] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/output_data_info/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_data/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_meta/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:44] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:45] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:45] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
static conf path: /data/projects/fate/eggroll/conf/eggroll.properties
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/model HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/status/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/clean HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/clean HTTP/1.1" 200 -
Read more »
1.7k 字 9 分钟

综述

FATE学习:跟着日志读源码(二)fate_flow server 启动介绍过,fate_flow_server.py中的DAGScheduler(interval=2 * 1000).start(),默认每隔2s,启动一次dag_scheduler,共五种调度。
其中:

task的状态保持running不变

若task的执行时间比轮询间隔2s要长,那么在轮询期间,task的状态保持running不变,
对应的${job_log_dir}/fate_flow_schedule.log 日志为

Read more »
1.6k 字 8 分钟

综述

同task,job结束时,并不会返回job finish 的信息,也是在DagScheduler.schedule_running_job() 轮询时,通过calculate_job_status() 方法获取job的状态。
获取job已经finish的信息后,会返还资源和进行相关环境清理。

执行步骤

在这里插入图片描述

  1. dag_scheduler.py:当调度完task后,执行calculate_job_status,计算job当前的状态。执行calculate_job_progress,计算job当前的完成进度。
    在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    [INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - dag_scheduler.py[line:310]: Job 202107260820309976351 status is success, calculate by task status list: ['success']
  2. dag_scheduler.py:根据job 状态和进度变化,同步信息,save model等
    下面这段代码没执行,逻辑上有点问题

    1
    2
    3
    4
    if int(new_progress) - job.f_progress > 0:
    job.f_progress = new_progress
    FederatedScheduler.sync_job(job=job, update_fields=["progress"])
    cls.update_job_on_initiator(initiator_job=job, update_fields=["progress"])

    存疑
    当task状态改变时TaskScheduler.schedule 返回的initiator_tasks_group 中的task状态未改变
    结果导致 DAGScheduler.schedule_running_job 中 total, finished_count = cls.calculate_job_progress(tasks_status=tasks_status)
    无改变,不会触发 后续流程,会多跑一遍。
    建议在task schedule结束之后,再query一遍task的信息,更新状态。

Read more »
1.2k 字 6 分钟

综述

task结束时,是通过TaskExecutor.report_task_update_to_driver更新本地的DB中task的状态信息的。但由于是异步请求,对于发起者而言,并不会收到task 结束的信息,只有在轮询中,去查询db,获取task的状态。
获取task已经finish的信息后,会返还资源和进行相关环境清理。

执行细节

在这里插入图片描述

  1. dag_scheduler.py:schedule_running_job 调度轮询,和前文类似,这里不多赘述调用链为
    DagScheduler.schedule_running_job() ->TaskScheduler.schedule->JobSaver.get_tasks_asc -> JobSaver.collect_task_of_all_party -> JobSaver.federated_task_status
    对应的${job_log_dir}/fate_flow_schedule.log 日志为

    1
    2
    3
    4
    5
    6
    7
    [INFO] [2021-07-26 08:20:44,944] [1:140259369826048] - dag_scheduler.py[line:298]: scheduling job 202107260820309976351
    [INFO] [2021-07-26 08:20:44,945] [1:140259369826048] - task_scheduler.py[line:28]: scheduling job 202107260820309976351 tasks
    [INFO] [2021-07-26 08:20:45,010] [1:140259369826048] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
    [INFO] [2021-07-26 08:20:45,022] [1:140259369826048] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'elapsed': 11447, 'end_time': 1627287644838, 'job_id': '202107260820309976351', 'party_id': '0', 'party_status': 'success', 'role': 'local', 'start_time': 1627287632259, 'status': 'running', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'update_time': 1627287631078}
    [INFO] [2021-07-26 08:20:45,022] [1:140259369826048] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
    [WARNING] [2021-07-26 08:20:45,036] [1:140259369826048] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
    [INFO] [2021-07-26 08:20:45,041] [1:140259369826048] - task_scheduler.py[line:143]: job 202107260820309976351 task 202107260820309976351_upload_0 0 status is success, calculate by task party status list: ['success']
  2. task_scheduler.py:同步状态,同上文相同,调用FederatedScheduler.sync_task_status
    产生的调用链为
    FederatedScheduler.sync_task_status() -> api_utils.federated_coordination_on_http() -> fate_flow_server通过flask -> party_app.task_status() -> TaskController.update_task_status

Read more »
4.5k 字 26 分钟

综述

前文对upload task 进行schedule 之后,最终调用 task_executor 进行执行,进入具体的执行部分。
每个task的具体日志会打在${job_log_dir}/ $ {role}/ ${party} 中(为便于记录,这里简记为 ${task_log_dir}
这里就按照 ${task_log_dir}/DEBUG.log看 会比较清晰一点

执行细节

在这里插入图片描述

  1. 执行命令
    1
    /opt/app-root/bin/python /data/projects/fate/python/fate_flow/operation/task_executor.py -j 202107260820309976351 -n upload_0 -t 202107260820309976351_upload_0 -v 0 -r local -p 0 -c /data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/task_parameters.json --run_ip 10.200.96.235 --job_server 10.200.96.235:9380
  2. task_executor.py:执行run_task
    先解析各参数。
    在${job_log_dir}/fate_flow_schedule.log 输出日志

    1
    2
    [INFO] [2021-07-26 08:20:33,391] [90:139687957583680] - task_executor.py[line:56]: enter task process
    [INFO] [2021-07-26 08:20:33,391] [90:139687957583680] - task_executor.py[line:57]: Namespace(component_name='upload_0', config='/data/projects/fate/jobs/202107260820309976351/local/0/upload_0/202107260820309976351_upload_0/0/task_parameters.json', job_id='202107260820309976351', job_server='10.200.96.235:9380', party_id=0, role='local', run_ip='10.200.96.235', task_id='202107260820309976351_upload_0', task_version=0)

    根据参数解析的结果,调用 schedule_utils.get_job_dsl_parser() 生成dsl_parser,并配置各项参数。
    设置job_log_dir 和 task_log_dir。
    注:设置完目录后,task产生的所有日志,都是输出到task_log_dir 下了,和外层的fate_flow_schedule.log 分离了

  3. task_executor.py:初始化Tracker 和 TrackerClient,获取run_class_paths、run_class_package、run_class_name,调用 report_task_update_to_driver(task_info=task_info)

  4. task_executor.py:执行report_task_update_to_driver()
    先在 ${task_log_dir}/fate_flow_schedule.log 输出日志

    1
    [INFO] [2021-07-26 08:20:33,395] [90:139687957583680] - task_executor.py[line:318]: report task 202107260820309976351_upload_0 0 local 0 to driver

    然后调用ControllerClient.report_task

  5. control_client.py:执行report_task
    先在${job_log_dir}/${role}/${party}/DEBUG.log 中打日志
    1
    [INFO] [2021-07-26 08:20:33,396] [90:139687957583680] - control_client.py[line:42]: request update job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0
    然后发起http请求,endpoint是report。
    流转流程同前文,调用链为 ControllerClient.report_task -> fate_flow_server 通过flask -> party_app.report_task() -> TaskController.update_task(task_info=task_info) -> TaskController.update_task_status(task_info=task_info)
Read more »
5.4k 字 28 分钟

综述

在前文的基础上,因为job已经处于running状态,会按照task的依赖关系,依次调度该job下的task。

调度之后的操作有2部分:

  • Part1. 申请资源
  • Part2. start job:将job的状态从waiting -> running 状态
    涉及的主要方法: dag_scheduler.schedule_running_jobs()

执行步骤

Read more »
1.6k 字 8 分钟

综述

从联邦建模的角度去理解整个job的生命周期,就是一系列功能模块组成的DAG,可以参照fate官方的文档,image
其中,各个功能模块就具体实现而言,就是fate中的各个算法或数据组件。

不过结合源码,从日志的角度来看,可以将整个job的生命周期,按照不同阶段所做的操作,进行划分。具体如下图:
job life cycle

  1. submit:提交 job
  2. create:创建job 和该job下对应的 tasks(相当于元数据) ,这里创建好之后,所有状态都是waiting
  3. job schedule:对于job,按照FIFO的顺序,轮询到waiting状态的job,为其申请资源并将状态置为running。
  4. task scheduler:轮询到running状态的job,对其涉及的tasks,按照依赖关系依次调度
  5. execute: 执行4中置为running状态的task
  6. task finish:task 运行完毕,进行资源回收和环境清理
  7. job finish:job 运行完毕,进行资源回收和环境清理
  8. cancel:中止正在执行的job,cancel并不算一般的生命周期中的操作,可以发生在create之后的任何阶段,接受到cancel后,在polling下一次轮询时生效

此外还有非生命周期中阶段polling: 是fate_flow_server上的轮询机制,探测到各种状态的job和task分别予以相对应的操作,严格而言,并不算是job的生命周期,只是一个轮询的角色。

Read more »