|
| 1 | +# Kubeflow Pipelines & TFX |
| 2 | + |
| 3 | +Status | Implemented |
| 4 | +:------------ | :-------------------------------------------- |
| 5 | +**Author(s) ** | Ajay Gopinathan ( [email protected]) |
| 6 | +**Sponsor ** | Konstantinos Katsiapis ( [email protected]) |
| 7 | +**Created** | 2019-06-30 |
| 8 | + |
| 9 | +## Objective |
| 10 | + |
| 11 | +This RFC documents the design and engineering effort proposed by the |
| 12 | +[Kubeflow Pipelines](https://github.com/kubeflow/pipelines) team to support |
| 13 | +[TFX](https://www.tensorflow.org/tfx) with Kubeflow Pipelines (KFP). |
| 14 | + |
| 15 | +TFX is an open-source effort by the TensorFlow team aimed at providing users |
| 16 | +with tools for building production grade machine-learning (ML) workflows. TFX |
| 17 | +provides an ML pipeline authoring framework in Python which encodes |
| 18 | +Google’s best practices for ML pipelines, including: |
| 19 | + |
| 20 | +* scalable and battle-tested components |
| 21 | +* ML-focused pipeline design patterns |
| 22 | +* strongly typed artifacts |
| 23 | +* artifact provenance tracking through [ML Metadata](https://github.com/google/ml-metadata) |
| 24 | + |
| 25 | +An important value-proposition of the TFX framework is that it is agnostic to |
| 26 | +the orchestration framework. At launch, TFX supported two orchestration engines |
| 27 | +natively: |
| 28 | + |
| 29 | +* [Apache Airflow](https://airflow.apache.org/) for running locally |
| 30 | +* [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/) |
| 31 | + for running in the cloud |
| 32 | + |
| 33 | +This document describes how TFX pipelines are run using |
| 34 | +Kubeflow Pipelines as its orchestration engine. It can be viewed as an extension |
| 35 | +of the main |
| 36 | +[TFX orchestration and configuration](https://github.com/tensorflow/community/tree/master/rfcs/20190718-tfx-orchestration.md) |
| 37 | +design document. |
| 38 | + |
| 39 | +## Motivation |
| 40 | + |
| 41 | +### TFX on Kubeflow Requirements |
| 42 | + |
| 43 | +The main focus areas for running TFX on Kubeflow were: |
| 44 | + |
| 45 | +* **Portability:** The user-facing code in a TFX pipeline should be portable. |
| 46 | + An early stated goal of our work was that we wanted the same pipeline to be |
| 47 | + runnable on both Airflow and KFP with a _single-line change_ in the |
| 48 | + pipeline construction code. |
| 49 | +* **Scalability:** TFX on KFP should solve the use-case of large-scale |
| 50 | + workloads, thereby showcasing the advantages of running on Google Cloud |
| 51 | + Platform (GCP). This meant enabling the use of strongly differentiating GCP |
| 52 | + services such as BigQuery, DataFlow and Cloud ML Engine for training and |
| 53 | + serving in the pipeline code. |
| 54 | + |
| 55 | +At launch, both of these requirements were achieved. Using KFP required a |
| 56 | +single-line change in the pipeline code, and the sample pipeline for KFP |
| 57 | +showcased the use of GCP services for running workloads at scale. |
| 58 | + |
| 59 | +### Overview of TFX pipelines |
| 60 | + |
| 61 | +A TFX pipeline is a **logical pipeline** consisting of a series of components. |
| 62 | +Each component is defined in terms of inputs, outputs, and execution properties. |
| 63 | +Inputs and outputs are represented as channels of ML Metadata Artifacts. |
| 64 | +Logically, each component consists of three parts: |
| 65 | + |
| 66 | +* `Driver`: Responsible for resolving input artifacts from the ML Metadata |
| 67 | + store. Determines if the execution has been previously cached and if so, |
| 68 | + whether the call to the `Executor` can be skipped. |
| 69 | +* `Executor`: Executes the main logic of the component, and provides a uniform |
| 70 | + interface around TFX libraries, as well as custom logic. |
| 71 | +* `Publisher`: Records output artifacts produced by the `Executor`, and passes |
| 72 | + these output artifact metadata to downstream steps. |
| 73 | + |
| 74 | +When running a pipeline under Airflow, the logical pipeline is converted to a |
| 75 | +series of _Airflow operators_. Each component comprises 3 operators representing |
| 76 | +the `Driver`, `Executor` and `Publisher`: |
| 77 | + |
| 78 | + |
| 79 | + |
| 80 | +At runtime, each `Driver` is responsible for resolving the metadata of input |
| 81 | +artifacts for a given component from MLMD, and for determining if any previously |
| 82 | +cached result of the component run can be used instead. If no cached result was |
| 83 | +found, the `Driver` invokes the `Executor` which performs the main application |
| 84 | +logic of the component. Upon completion, the `Publisher` writes the output |
| 85 | +metadata to MLMD. In the case of Airflow, the `Publisher` operator also |
| 86 | +publishes the same metadata for consumption by downstream components using |
| 87 | +Apache Airflow’s |
| 88 | +(XCom)[https://airflow.apache.org/concepts.html?highlight=xcom#xcoms] mechanism. |
| 89 | + |
| 90 | +## Design proposal |
| 91 | + |
| 92 | +### Kubeflow Pipelines Orchestration |
| 93 | + |
| 94 | +KFP uses [Argo](https://argoproj.github.io/argo/) as its orchestration engine. |
| 95 | +Argo is a Kubernetes-specific engine for orchestrating the execution of |
| 96 | +workflows where each individual workflow step is the execution of a |
| 97 | +containerized application. Argo employs a YAML-based specification to construct |
| 98 | +the workflow graph, which also specifies how each container’s application should |
| 99 | +be invoked. |
| 100 | + |
| 101 | +Passing data from upstream components to downstream ones is accomplished via |
| 102 | +[Argo output parameters](https://argoproj.github.io/docs/argo/examples/readme.html#output-parameters). |
| 103 | +The output results of a component are written to named, container-local files |
| 104 | +after every iteration. The contents of this file can then be passed as input |
| 105 | +parameters to subsequent steps. In particular, the contents are passed as raw |
| 106 | +strings which can be used as command-line arguments when invoking the downstream |
| 107 | +step using a templating mechanism in the Argo specification. |
| 108 | + |
| 109 | +In order to run a TFX pipeline on KFP, the user specifies `KubeflowRunner` |
| 110 | +instead of `AirflowDAGRunner` in the pipeline definition file. The logical |
| 111 | +pipeline definition itself remains unchanged, thus ensuring portability of |
| 112 | +pipelines across orchestration engines. |
| 113 | + |
| 114 | +In contrast to Apache Airflow, using `KubeflowRunner` and running the pipeline |
| 115 | +file does not actually launch the pipeline. Instead, the logical pipeline is |
| 116 | +**compiled**, resulting in a pipeline definition file in YAML, which contains |
| 117 | +the Argo specification for a workflow that can be run on Kubernetes. The user |
| 118 | +must then manually upload this pipeline definition file to a cluster running |
| 119 | +Kubeflow Pipelines before it can be run. |
| 120 | + |
| 121 | + |
| 122 | + |
| 123 | +In the Kubeflow cluster, users use an interactive UI to select and launch their |
| 124 | +pipeline. The KFP APIServer will then submit the uploaded pipeline definition to |
| 125 | +the **Argo controller** to orchestrate the actual workflow. The Argo |
| 126 | +specification specifies which container to execute and which command line |
| 127 | +invocation to use during each step. |
| 128 | + |
| 129 | +KFP provides a [Python SDK](https://www.kubeflow.org/docs/pipelines/sdk/) for |
| 130 | +constructing ML workflows on top of Argo. The main abstraction used is the |
| 131 | +[ContainerOp](https://www.kubeflow.org/docs/pipelines/sdk/build-component/) |
| 132 | +class, which can be viewed as a Python representation of a containerized |
| 133 | +workflow step in Argo. During compilation, each TFX component in the pipeline is |
| 134 | +transformed into a `ContainerOp`. There are three key elements of `ContainerOp` |
| 135 | +which are used when constructing the individual steps in TFX pipelines: |
| 136 | + |
| 137 | +* **Image:** All TFX components are executed using the same pre-built |
| 138 | + [Docker image](https://hub.docker.com/r/tensorflow/tfx) which contains the |
| 139 | + TFX library and its dependencies. |
| 140 | +* **Command-line arguments:** The command-line arguments specify how the image |
| 141 | + should be invoked. In particular, they specify the exact TFX component and |
| 142 | + executor that needs to run for a given step. Metadata representing input |
| 143 | + artifacts are passed as arguments to a container step using Argo’s built-in |
| 144 | + templating mechanism. |
| 145 | +* **File outputs:** Argo can use the contents of container-local files |
| 146 | + produced within each step as input data to be passed to downstream steps. |
| 147 | + When the TFX container successfully completes the execution of an |
| 148 | + `Executor`, it writes the ML Metadata representation (that is, Artifact and |
| 149 | + ArtifactType protos) of output artifacts into named local files, which will |
| 150 | + be passed along to downstream components by Argo. This can be viewed as the |
| 151 | + **_publish_** step equivalent of using Airflow’s XCom mechanism. |
| 152 | + |
| 153 | +Consider the snippet of a TFX pipeline consisting of components `Transform`, |
| 154 | +`SchemaGen` and `Trainer`. `Transform` produces transformed examples as well as |
| 155 | +the transform graph itself, which are consumed by the `Trainer` component. |
| 156 | +`Trainer` also consumes the schema produced by `SchemaGen` component. |
| 157 | + |
| 158 | + |
| 159 | + |
| 160 | +In KFP, each component is now represented as the execution of the TFX container |
| 161 | +image. Individual components have customized command-line invocations, which are |
| 162 | +based on their input arguments and which TFX executor to execute. |
| 163 | +The execution of each step is controlled by instances of the |
| 164 | +[`ExecutorRunner`](https://github.com/tensorflow/tfx/blob/master/tfx/orchestration/kubeflow/executor_wrappers.py) |
| 165 | +base class. This class is responsible for constructing the arguments required by |
| 166 | +all TFX executors, namely: |
| 167 | + |
| 168 | +* `input_dict`: A dictionary of input artifacts. These are constructed at |
| 169 | + runtime using the values of the Argo output-parameters that were passed in |
| 170 | + as inputs. |
| 171 | +* `output_dict`: A dictionary of output artifacts. These are pre-determined |
| 172 | + for each derived class of `ExecutorRunner` and specialized per-component. |
| 173 | +* `exec_properties`: A dictionary of runtime parameters, whose values may |
| 174 | + either be primitive Python types, or serialized JSON representation of |
| 175 | + protocol buffers. |
| 176 | + |
| 177 | +The arguments are constructed and used to call into the specified TFX `Executor` |
| 178 | +(for example, `tfx.components.trainer.executor.Executor`). If execution is |
| 179 | +successful, `ExecutorRunner` writes each output artifact (as specified in |
| 180 | +`output_dict`) and their schema types in JSON-serialized format into a container |
| 181 | +local file. The contents of this file are then passed as ML Metadata artifacts |
| 182 | +for consumption by downstream steps. The KFP UI visualizes both input and output |
| 183 | +parameters for each step. |
| 184 | + |
| 185 | + |
| 186 | + |
| 187 | +### ML Metadata Tracking |
| 188 | + |
| 189 | +In contrast to Airflow, TFX on KFP does not have drivers and publishers. |
| 190 | +Instead, metadata is recorded passively in KFP’s APIServer, by parsing the |
| 191 | +status of the Argo workflow custom resource definition (CRD) periodically. Each |
| 192 | +Argo workflow CRD status contains recorded values of Argo output parameters |
| 193 | +(that is, the contents of the named local files) upon successful completion of |
| 194 | +the workflow step. KFP employs a custom Kubernetes controller called |
| 195 | +PersistenceAgent, which periodically polls for the latest status of all Argo |
| 196 | +workflow resources, and updates the state in the APIServer. |
| 197 | + |
| 198 | + |
| 199 | + |
| 200 | +The APIServer parses Argo workflows and looks for Argo output parameters that |
| 201 | +look like serialized MLMD artifacts in specially named files (by convention, the |
| 202 | +files are named `/output/ml_metadata/{output_name}`). These artifacts and their |
| 203 | +types are then recorded into an MLMD instance powered by the same MySQL server |
| 204 | +that backs KFP’s persistent data. |
| 205 | + |
| 206 | +## Future Work |
| 207 | + |
| 208 | +While TFX on KFP works, it still does not have feature parity with the Apache |
| 209 | +Airflow version. We are exploring the following directions concurrently to close |
| 210 | +the gap between the two orchestrators: |
| 211 | + |
| 212 | +* **Metadata-driven orchestration**: The current version of TFX on KFP records |
| 213 | + artifacts in MLMD, but does so passively. This is due to the lack of drivers |
| 214 | + and publishers in the initial implementation. Hence, lineage tracking and |
| 215 | + caching is not currently possible. |
| 216 | +* **Enabling arbitrary user containers with MLMD artifacts as the interface |
| 217 | + between pipeline steps:** Currently, incorporating a custom step in a TFX |
| 218 | + OSS pipeline requires users to implement a custom executor. Users in Cloud |
| 219 | + frequently have an existing application, written in a non-Python language |
| 220 | + (such as R, Java, etc), which they would like to plug into their TFX-based |
| 221 | + pipeline. |
| 222 | +* **Unified pipeline authoring experience:** TFX and KFP both present users |
| 223 | + with a Python-based DSL for constructing their pipelines. The DSL constructs |
| 224 | + look very similar from the user’s point of view, but are fundamentally very |
| 225 | + different underneath. This has led to customer confusion. Unifying the DSL, |
| 226 | + and presenting a single user-facing experience for constructing ML pipelines |
| 227 | + is a goal that we’re actively exploring. |
| 228 | +* **Pipeline-level runtime parameters:** KFP provides the possibility of |
| 229 | + specifying pipeline-level parameters so users can run the same pipeline with |
| 230 | + different combinations of control parameters. Since the pipeline definition |
| 231 | + is a YAML-based file equipped with a templating mechanism, all pipeline |
| 232 | + runtime parameters are restricted to string types. This presents a challenge |
| 233 | + for specifying pipeline parameters at runtime that are not simple strings |
| 234 | + (for example, the number of training steps in `Trainer` is specified in a |
| 235 | + protocol buffer which must be serialized at runtime to be consumed by the |
| 236 | + component). Contrast this to the Airflow scenario, where arbitrary code can |
| 237 | + execute to yield runtime parameters since the pipeline definition and |
| 238 | + runtime environment exist in the same execution scope. |
| 239 | + |
0 commit comments