本文档主要介绍Paddleflow Pipeline Python DSL的相关接口, 开发者可以参考本说明结合自身需求进行使用。关于Paddleflow Pipeline的介绍以及使用请参考这里
@Pipeline(name="base_pipeline", docker_env="nginx:1.7.9", parallelism=1)
def base_pipeline(data_path, epoch, model_path):
preprocess_step = preprocess(data_path)
train_step = train(epoch, model_path, preprocess_step.parameters["data_path"])
train_step.after(preprocess_step)
validate_step = validate(train_step.parameters["model_path"])
if __name__ == "__main__":
ppl = base_pipeline(data_path=f"./base_pipeline/data/{PF_RUN_ID}", epoch=5, model_path=f"./output/{PF_RUN_ID}")Pipeline是一个类装饰器,开发者需要用其对编排pipeline的函数进行装饰
| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| name | string | pipeline的名字 | 是 | 需要满足如下正则表达式: "^[A-Za-z_][A-Za-z0-9_]{0,49}$" |
| parallelism | int | pipeline 任务的并发数,即最大可以同时运行的节点任务数量 | 否 | |
| docker_env | string | 各节点默认的docker镜像地址 | 否 | |
| env | dict[str, str] | 各节点运行任务时的环境变量 | 否 | |
| cache_options | CacheOptions | Pipeline 级别的Cache配置 | 否 | 关于Cache机制的相关介绍,请点击这里 |
| failure_options | FailureOptions | failure_options配置 | 否 | 关于failure_options的相关介绍,请点击这里 |
| fs_options | FSOptions | 关于fs_options的详细介绍,请点击这里 |
注意: 在Pipeline和ContainerStep中都可以进行设置 env 参数,在运行时,将采用合并机制:
- 在运行时,ContainerStep的环境变量即包含了ContainerStep.env属性中指定环境变量,也包含了Pipeline.env中包含的环境变量,如果有同名的环境变量,则使用ContainerStep.env定义的参数值
Pipeline的一个实例
# 这里的send_mail_step()函数需要返回一个ContainerStep实例
ppl.set_post_process(send_mail_step("paddleflow@pipeline.com"))| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| step | ContainerStep | post_process阶段运行的Step | 是 |
无返回值
post_process = ppl.get_post_process()无参数
一个在post_process阶段运行的ContainerStep实例
ppl.run(fs_name="your_fs_name", disabled=["a", "b.c"])| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| config | string | 配置文件路径 | 否 | 配置文件的内容请参考这里 |
| fs_name | string | 共享存储名称 | 否 | 如果有使用共享存储,则必须填写该参数 |
| username | string | 指定用户,用于root账号运行特定用户的fs的工作流 | 否 | |
| run_name | string | 工作流名称 | 否 | |
| desc | string 工作流描述 | 否 | ||
| disabled | List[string] | 本次运行需要disabled的节点的全路径名称 | 否 | 全路径名称: 由其所有祖先节点的名字以及当前节点的名字组合而成,以"."分割,祖先节点名在前 在上面的实例中,节点a, 以及节点b的子节点c将会被disable |
| 字段名称 | 字段类型 | 字段含义 |
|---|---|---|
| ret | bool | 操作成功返回True,失败返回False |
| response | - | 失败返回失败message,成功返回run_id |
ppl.compile("run.yaml")| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| save_path | string | 保存编译产出的文件路径,文件内容可以参考这里 | 否 | 文件后缀需要是 [".yaml", ".json", ".yml"]之一 |
一个包含编译产出的所有信息的Dict。其内容可以参考这里
ppl.entry_points无参数
一个Dict,其key为节点的名字,value为节点实例
ppl.name无参数
一个string, 表示pipeline的名字
ppl.name = "exp_ppl"无参数
无返回值
ppl.env无参数
一个dict,包含了所有pipeline级别的环境变量信息
ppl.add_env({"env1": "env1"})| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| env | dict[str, str] | 需要新增的环境变量 | 是 |
无返回值
ppl.create()| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| config | string | 配置文件路径 | 否 | 配置文件的内容请参考这里 |
| desc | string | 工作流模板的描述 | 否 | |
| username | string | 模板所属用户名称 | 否 |
| 字段名称 | 字段类型 | 字段含义 |
|---|---|---|
| ret | bool | 操作成功返回True,失败返回False |
| response | - | 失败返回失败message,成功返回dict:{'name': 名称, 'pplID': 工作流模板id, 'pplVerID': 工作流模板版本id} |
ppl.update("123")| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| pipeline_id | string | 工作流模板id | 是 | |
| config | string | 配置文件路径 | 否 | 配置文件的内容请参考这里 |
| desc | string | 工作流模板的描述 | 否 | |
| username | string | 模板所属用户名称 | 否 |
| 字段名称 | 字段类型 | 字段含义 |
|---|---|---|
| ret | bool | 操作成功返回True,失败返回False |
| response | - | 失败返回失败message,成功返回dict: {'pipelineID': pplID, 'pipelineVersionID': pplVerID} |
step = ContainerStep(
name="preprocess",
parameters={"data_path": data_path},
command="bash base_pipeline/shells/data.sh {{data_path}}",
docker_env="nginx:1.7.9",
env={
"USER_ABC": f"123_{PF_USER_NAME}",
"PF_JOB_TYPE": "vcjob",
"PF_JOB_QUEUE_NAME": "ppl-queue",
"PF_JOB_MODE": "Pod",
"PF_JOB_FLAVOUR": "flavour1",
},
)| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| name | string | step 的名字 | 是 | 需要满足如下正则表达式: "^[a-zA-Z][a-zA-Z0-9-]{0,29}$" |
| command | string | 需要执行的命令 | 否 | |
| docker_env | string | docker 镜像地址 | 否 | |
| inputs | dict[string, Aritfact] | 输入artifact信息 | 否 | key将会作为artifact的名字,value需要是其余的节点输出artifact |
| outputs | dict[string, Artifact] | 输出artifact信息 | 否 | key将会作为artifact的名字, value必须是Artifact() |
| parameters | dict[string, Union[string, int, float, Parameter]] | parameter信息 | 否 | key将作为parameter的名字,value即为parameter的默认值 |
| env | dict[str, str] | 节点运行任务时的环境变量 | 否 | |
| cache_options | CacheOptions | Cache 配置 | 否 | 关于Cache机制的相关介绍,请点击这里 |
| contidion | string | 用于在Pipeline任务运行时决定是否运行当前步骤的条件判断式 | 否 | 关于condition的详细信息,请参考这里 |
| loop_argument | Union[List, Parameter, Artifact, string, _LoopItem] | 循环参数,如果有值,在运行时,会对该字段进行遍历,对于其中的每一项,都会调度执行一次当前节点 | 否 | 更多信息,请参考loop_argument |
| extra_fs | List[ExtraFS] | 节点运行时需要挂载的共享存储的相关信息 | 更多信息,请参考这里 |
注意1:inputs, outputs, parameters 中的key不可以同名
注意2: 在Pipeline和ContainerStep中都可以进行设置 env 参数,在运行时,将采用合并机制:
- 在运行时,Step的环境变量即包含了ContainerStep.env属性中指定环境变量,也包含了Pipeline.env中包含的环境变量,如果有同名的环境变量,则使用ContainerStep.env定义的参数值
一个ContainerStep的实例
step.env无参数
一个dict,包含了所有环境变量信息
step.add_env({"env1": "env1"})| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| env | dict[str, str] | 需要新增的环境变量 | 是 |
无返回值
step.name无参数
一个string, 表示step的名字
step.name = "step1"无参数
无返回值
step.inputs无参数
一个dict: 其中key为artifact的名字,value为value为Artifact实例
step.outputs无参数
一个dict: 其中key为artifact的名字,value为Artifact实例
params = ppl.parameters无参数
key 为parameter的名字,value为对应的Parameter实例
step.after(step1, step2)| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| *upstream | List[Union[ContainerStep, DAG]] | 可变参数,每一项均需要是一个ContainerStep或者DAG实例 | 是 | 当前节点的上游节点,在运行时,会等所有上游节点运行成功才会运行当前节点 |
ContainerStep实例本身
step.loop_argument无参数
用于表征当前节点的循环参数参数的_LoopArgument的实例
step.condition无参数
一个string,表示当前节点的condition
全路径名称: 由其所有祖先节点的名字以及当前节点拼凑而成,以"."分割,祖先节点名在前
step.full_name无参数
一个string,表示当前节点的全路径名
注意:
- Pipeline中所有节点都会有一个名为 PF-ENTRY-POINT的祖先节点
dag = DAG(name="dag")| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| name | string | dag 的名字 | 是 | 需要满足如下正则表达式: "^[a-zA-Z][a-zA-Z0-9-]{0,29}$" |
| contidion | string | 用于在Pipeline任务运行时决定是否运行当前步骤的条件判断式 | 否 | 关于condition的详细信息,请参考这里 |
| loop_argument | Union[List, Parameter, Artifact, string, _LoopItem | 循环参数,如果有值,在运行时,会对该字段进行遍历,对于其中的每一项,都会调度执行一次当前节点 | 否 | 更多信息,请参考loop_argument |
一个DAG实例
dag.name无参数
一个string, 表示dag的名字
dag.name = "dag"无参数
无返回值
dag.after(step1, dag2)| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| *upstream | List[Union[ContainerStep, DAG]] | 可变参数,每一项均需要是一个ContainerStep或者DAG实例 | 是 | 当前节点的上游节点,在运行时,会等所有上游节点运行成功才会运行当前节点 |
DAG实例本身
dag.loop_argument无参数
用于表征当前节点的循环参数参数的_LoopArgument的实例
dag.condition无参数
当前节点的condition字段的值
全路径名称: 由其所有祖先节点的名字以及当前节点拼凑而成,以"."分割,祖先节点名在前
dag.full_name无参数
一个string,表示当前节点的全路径名
注意:
- Pipeline中所有节点都会有一个名为PF-ENTRY-POINT的祖先节点
art = Artifact()无参数
一个Artifact实例
art.component无参数
一个DAG或者ContainerStep实例
art.name无参数
一个string, 表明该Artifact的名字
pa = Parameter(10)| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| default | Union[string, int, float, list, _LoopItem] | parameter的默认值 | 否 | |
| type | Enum["int", "float", "string". "list"] | parameter的类型 | 否 |
如果设置了type字段,则要求default的值与type指代的类型相同
一个Parameter实例
step = pa.component无参数
一个DAG或者ContainerStep实例
pa_name = pa.name无参数
一个string, 表名该Parameter的名字
default_value = pa.default无参数
该Parameter的默认值
pa.default = "10"无参数
无返回值
pa_type = pa.type无参数
如果Parameter有设置type字段,则返回一个字符串,用于指代该Parameter的类型,否则为None
用户不应该显示的初始化_LoopArgument实例,而是应该在定义节点时,通过给其loop_argument属性赋值来间接的实例化_LoopArgument对象。
如下所示
step = ContainerStep(
name="step",
loop_argument=[1,2,3],
command="echo {{PF_LOOP_ARGUMENT}}"
)此时,step.loop_argument 即为一个_LoopArgument的实例
step.loop_argument.item无参数
一个_LoopItem实例,用于指代在节点运行时,当次运行所对应的循环参数的值
更多信息可以参考这里
用户不应该显示初始化_LoopItem实例, 实例化_LoopArgument时,会自动实例化一个_LoopItem实例, 如下所示:
step = ContainerStep(
name="step",
loop_argument=[1,2,3],
command="echo {{PF_LOOP_ARGUMENT}}"
)此时step.loop_argument.item即为一个_LoopItem类型的实例
fs_scope = FSScope(name="ppl", path="/143")| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| name | string | 共享存储的名字 | 是 | - |
| path | string | 共享存储上需要参与[cache fingerprint][cache fingerprint]计算的路径 | 否 | 多个路径以","隔开 默认值为"/" |
一个FSScope实例
fs_scope = FSScope(name="ppl", path="/143")
cache = CacheOptions(
enable=True,
max_expired_time=300,
fs_scope=[fs_scope]
)| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| enable | bool | 是否启用cache功能 | 否 | |
| max_expired_time | int | cache缓存的有效期 | 否 | 为-1表示无限期 |
| fs_scope | list[FSScope] | 参与计算fingerprint的共享存储的信息 | 否 | 详情请参考这里 |
一个CacheOptions实例
failure_options = FailureOptions("continue")| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| strategy | string | failure_options 策略 | 是 | 详情请参考这里 |
当前只支持两种策略:continue和fail_fast
一个FailureOptions实例
extra_fs = ExtraFS("ppl", sub_path="condition_example")| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| name | string | 共享存储的名字 | 是 | |
| mount_path | string | 容器内的挂载路径 | 否 | |
| sub_path | string | 共享存储中需要被挂载的子路径 | 否 | 如果没有配置,将会把整个共享存储挂载至容器中 不能以"/"开头 |
| read_only | bool | 容器对mount_path的访问权限 | 否 |
一个ExtraFS实例
main_fs = MainFS("ppl", sub_path="output", mount_path="./test_dsl")| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| name | string | 共享存储的名字 | 是 | |
| mount_path | string | 容器内的挂载路径 | 否 | |
| sub_path | string | 共享存储中需要被挂载的子路径 | 否 | 如果没有配置,将会把整个共享存储挂载至容器中 不能以"/"开头 如果该路径存在,则必须为目录 |
一个MainFS实例
extra_fs = ExtraFS("ppl", sub_path="output", mount_path="/home/output")
main_fs = MainFS("ppl", sub_path="output", mount_path="./test_dsl")
fs_options = FSOptions(main_fs=main_fs, extra_fs=[extra_fs])| 字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
|---|---|---|---|---|
| main_fs | MainFS | 用于存储节点输出artifact的共享存储信息 | 否 | |
| extra_fs | list[ExtraFS] | 节点运行时需要挂载的共享存储信息 | 否 |
DSL也提供一些可以节点运行时获取的系统变量,见下表:
| 字段名称 | 字段含义 |
|---|---|
| PF_RUN_ID | pipeline 任务的唯一标识符 |
| PF_STEP_NAME | step的名字 |
| PF_USER_NAME | 发起本次pipeline任务的用户名 |
| PF_LOOP_ARGUMENT | 指代循环参数在当次运行的值 |