diff --git a/proposals/separate-standalone-driver/README.md b/proposals/separate-standalone-driver/README.md new file mode 100644 index 00000000000..0e64114bdcd --- /dev/null +++ b/proposals/separate-standalone-driver/README.md @@ -0,0 +1,233 @@ +# Standalone driver + +## Summary +In [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/interfaces/), each node in the pipeline graph is non-atomic and, at the Kubernetes level, consists of two components: a driver and an executor. Each of these runs in a separate Kubernetes pod. Additionally, every pipeline run spawns a root DAG driver pod. + +Here’s a simple diagram of the pods created during a KFP (Kubeflow Pipelines) run: + +*(User-defined pipeline with 2 tasks: Task 1 and Task 2)* + +![img.png](kfp-pods.png) + +This proposal explores approaches to replacing both the root DAG driver and all container drivers with a single standalone service, using Argo Workflows as the orchestration framework. + + +## Motivation +While using a separate pod for the executor makes sense - since it often handles heavy workloads and benefits from isolation and flexibility - the driver is a lightweight component. It typically performs just a few API calls: checking for cached results and creating an MLMD Execution. + +However, running the driver in a separate pod causes several issues: + +High overhead: Launching a Kubernetes pod merely to execute a few API calls introduces significant latency. Often, the pod scheduling and startup time outweighs the driver's actual processing time. + +Resource availability problems: There's no guarantee the Kubernetes cluster has sufficient resources to schedule the driver pod. If scheduling fails, the pipeline gets stuck. The UI currently doesn't show driver pod scheduling failures, which makes it hard to debug and understand what's going on. + +## Current state details + +Let's take a look at the copy of [hello_world.yaml](hello_world.yaml) generated by the argo compiler tests. + +**Execution Order:** + +1. **entrypoint** + *Type:* DAG + The root DAG represents the entire pipeline run. + +2. **root-driver** *(template: system-dag-driver)* + *Type:* Container + *Purpose:* Initializes the root DAG. Creates an MLMD execution for the DAG. + **Outputs:** + - `execution-id` – ID of the DAG execution (created during root-driver execution) + + **Tasks inside:** + + - **hello-world-driver** *(template: system-container-driver)* + *Purpose:* Check for the existence of an execution in the cache. If it does not exist, prepare the MLMD execution of the hello-world container task, and generate the appropriate pod-spec-patch. + **Outputs:** + - `pod-spec-patch` – patch for the system-container-executor pod; inserts the correct image and command for the main container + - `cached-decision` – if true, the next step will be skipped + + - **hello-world** *(template: system-container-executor)* + *Depends on:* `hello-world-driver.Succeeded` + *Purpose:* Executes the hello-world component + **Inputs:** + - `pod-spec-patch` — patch for the pod generated in the previous step + - `cached-decision` — used as a skip condition + + The `system-container-executor` template defines the main container that runs the user-defined code. + + +Overview of the Argo workflow node structure for the container-driver +```yaml + templates: + - name: system-container-driver + container: + args: + ... + command: + - driver + image: ghcr.io/kubeflow/kfp-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + path: /tmp/outputs/pod-spec-patch + - name: cached-decision + valueFrom: + path: /tmp/outputs/cached-decision +``` +It creates a pod that launches the driver container using the kfp-driver image. + +## Proposal + +Instead of launching a new driver's pod using a container template, configure the system to send requests to an already running server. +Something like this (showing both types of drivers): +```yaml + templates: + - name: system-container-driver + request: + args: + ... + outputs: + parameters: + - name: pod-spec-patch + jsonPath: $.pod_spec_patch + - name: cached-decision + jsonPath: $.cached_decision +``` +```yaml + - name: system-dag-driver + request: + args: + ... + outputs: + parameters: + - name: execution-id + valueFrom: + jsonPath: $.execution-id + - name: iteration-count + valueFrom: + default: "0" + jsonPath: $.iteration-count + - name: condition + valueFrom: + default: "true" + jsonPath: $.condition +``` + + +### Requirements: +- Execute a remote call with parameters +- Read the response +- Extract parameters from the response +- Use the response in the next steps + +*Extract parameters from the response — this is important to use in the Argo workflow itself, specifically cached-decision and pod-spec-patch. These parameters are used in when conditions and to patch the pod specification.* + +### Argo workflow Features +Two similar features in Argo Workflow can be considered to meet these requirements: +- [Http Template](https://argo-workflows.readthedocs.io/en/latest/http-template) +- [Executor Plugin](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/) + +Comparison: + +| Feature | Supports Remote Call | Read the Response | Can Extract Parameters | Notes | +|------------------|----------------------|-------------------|------------------------|------------------------------| +| HTTP Template | ✅ | ✅ | ❌ | | +| Executor Plugin | ✅ | ✅ | ✅ | Requires plugin installation | + +The HTTP template [is not able](https://github.com/argoproj/argo-workflows/issues/13955) to extract parameters from the response and can only use the full response as-is. As a result, it cannot be used in podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' or when: '{{inputs.parameters.cached-decision}} != true' + +There’s a trade-off between running a standalone driver service pod globally or single per workflow. This is a balance between better performance and avoiding a single point of failure. +Currently, Argo [supports](https://github.com/argoproj/argo-workflows/issues/7891) only one driver pod per workflow option. Both options are based on the Agent pod, which is currently started per workflow — this is a limitation of the current [implementation](https://github.com/argoproj/argo-workflows/issues/7891). + +### Implementation Based on the Executor Plugin + +Instead of creating a driver pod for each task, we can reuse a single agent pod via a plugin template: +[Agent pod](https://github.com/argoproj/argo-workflows/issues/5544) is a unit designed for extension. +It can be extended by any server that implements the protocol. +This server(plugin in Executor plugin terminology) runs as a sidecar alongside the agent pod. + +Below is a scheme where, instead of creating a pod for the driver's task, we reuse the Argo Workflow Agent via a plugin +![img.png](executor-plugin-flow.png) + + +To move from the container template to the Executor Plugin template: +- patch the [Argo compiler](https://github.com/kubeflow/pipelines/tree/a870b1a325dae0c82c8b6f57941468ee1aea960b/backend/src/v2/compiler/argocompiler) to generate a plugin template instead of a container template. Sample: hello-world [adapted](hello_world_plugin.yaml) (see name: system-container-driver) +- Namely, replace the templates used in the [container-driver](https://github.com/kubeflow/pipelines/blob/a870b1a325dae0c82c8b6f57941468ee1aea960b/backend/src/v2/compiler/argocompiler/container.go#L148) and [dag-driver](https://github.com/kubeflow/pipelines/blob/a870b1a325dae0c82c8b6f57941468ee1aea960b/backend/src/v2/compiler/argocompiler/dag.go#L156) section of the compiler +- Extract the [driver](https://github.com/kubeflow/pipelines/tree/a870b1a325dae0c82c8b6f57941468ee1aea960b/backend/src/v2/driver) component into a standalone server. +- Implement the [plugin](plugin.md) + +The sample of the Argo Workflow system-container-driver template based on plugin. +```yaml + - name: system-container-driver + inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + # this is the key change that specifies use of the executor plugin + plugin: + driver-plugin: + args: + cached_decision_path: '{{outputs.parameters.cached-decision.path}}' + component: '{{inputs.parameters.component}}' + condition_path: '{{outputs.parameters.condition.path}}' + container: '{{inputs.parameters.container}}' + dag_execution_id: '{{inputs.parameters.parent-dag-id}}' + iteration_index: '{{inputs.parameters.iteration-index}}' + kubernetes_config: '{{inputs.parameters.kubernetes-config}}' + pipeline_name: namespace/n1/pipeline/hello-world + pod_spec_patch_path: '{{outputs.parameters.pod-spec-patch.path}}' + run_id: '{{workflow.uid}}' + task: '{{inputs.parameters.task}}' + type: CONTAINER + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + jsonPath: $.pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + jsonPath: $.cached-decision + - name: condition + valueFrom: + default: "true" + jsonPath: $.condition +``` + +## Test Plan +[x] I/we understand the owners of the involved components may require updates to existing tests to make this code solid enough prior to committing the changes necessary to implement this enhancement. + +Unit Tests +Unit tests will primarily validate the compilation from KFP pipelines to Argo Workflow specs, while most other logic will be covered by integration tests. + +Integration tests +Add an additional E2E test to verify the behavior of the global driver server. + +Additionally, it is nice to have end-to-end (E2E) tests to verify basic functionality. Existing tests should be reused if available. The E2E tests should cover at least the following scenarios: +- A simple pipeline with a single component, waiting for successful completion of the run. +- A pipeline with a chain of components passing inputs and outputs between them, waiting for successful completion of the run. +- A pipeline designed to fail, waiting for the run to end with an error. +- A pipeline which fails but has retries enabled(pipeline/ and component level), waiting for the run to complete successfully. + +## Conclusion +This proposal introduces an optimization for Kubeflow Pipelines (KFP) that replaces per-task driver pods with a lightweight standalone service based on Argo Workflows’ Executor Plugin mechanism. It significantly reduces pipeline task startup time by eliminating the overhead of scheduling a separate driver pod for each task — particularly beneficial for large pipelines with multiple steps and caching enabled. +Instead of launching a new driver pod per task, the driver logic is offloaded to a shared agent pod that is scheduled per workflow, and completes once the workflow ends. This reduces latency in cache lookups and metadata initialization. +However, this approach does not fully eliminate pod scheduling issues: the standalone driver is not a global service, but is instantiated per workflow. Thus, a pod still needs to be scheduled for each workflow run. + +## Disadvantages: +A key limitation of this implementation is that it currently supports only the Argo Workflows backend. The Executor plugin also adds some extra complexity to maintenance and deployment. + +## Open Questions: +- Do we need a fallback mechanism to the per-task driver pods in case the Executor Plugin is not available in some installations? Should KFP continue supporting both execution flows (plugin-based and pod-based drivers) for compatibility? + +## Follow-ups +- Implement a global agent pod. The community is [open](https://github.com/argoproj/argo-workflows/issues/7891) to it. \ No newline at end of file diff --git a/proposals/separate-standalone-driver/executor-plugin-flow.png b/proposals/separate-standalone-driver/executor-plugin-flow.png new file mode 100644 index 00000000000..2791d6a6f80 Binary files /dev/null and b/proposals/separate-standalone-driver/executor-plugin-flow.png differ diff --git a/proposals/separate-standalone-driver/hello_world.yaml b/proposals/separate-standalone-driver/hello_world.yaml new file mode 100644 index 00000000000..0145ff56168 --- /dev/null +++ b/proposals/separate-standalone-driver/hello_world.yaml @@ -0,0 +1,322 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: hello-world- +spec: + arguments: + parameters: + - name: components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7 + value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + - name: implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7 + value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf + \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def + hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", + dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.9"}' + - name: components-root + value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - namespace/n1/pipeline/hello-world + - --run_id + - '{{workflow.uid}}' + - --run_name + - '{{workflow.name}}' + - --run_display_name + - '' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + - --http_proxy + - '' + - --https_proxy + - '' + - --no_proxy + - '' + command: + - driver + image: ghcr.io/kubeflow/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - args: + - --copy + - /kfp-launcher/launch + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}' + - name: container + value: '{{workflow.parameters.implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: hello-world-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.hello-world-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.hello-world-driver.outputs.parameters.cached-decision}}' + depends: hello-world-driver.Succeeded + name: hello-world + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - namespace/n1/pipeline/hello-world + - --run_id + - '{{workflow.uid}}' + - --run_name + - '{{workflow.name}}' + - --run_display_name + - '' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --http_proxy + - '' + - --https_proxy + - '' + - --no_proxy + - '' + command: + - driver + image: ghcr.io/kubeflow/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{"parameters":{"text":{"stringValue":"hi there"}}}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null diff --git a/proposals/separate-standalone-driver/hello_world_plugin.yaml b/proposals/separate-standalone-driver/hello_world_plugin.yaml new file mode 100644 index 00000000000..3585b3d3c23 --- /dev/null +++ b/proposals/separate-standalone-driver/hello_world_plugin.yaml @@ -0,0 +1,264 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: hello-world- +spec: + arguments: + parameters: + - name: components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7 + value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + - name: implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7 + value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf + \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def + hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", + dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.9"}' + - name: components-root + value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + jsonPath: $.pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + jsonPath: $.cached-decision + - name: condition + valueFrom: + default: "true" + jsonPath: $.condition + plugin: + driver-plugin: + args: + cached_decision_path: '{{outputs.parameters.cached-decision.path}}' + component: '{{inputs.parameters.component}}' + condition_path: '{{outputs.parameters.condition.path}}' + container: '{{inputs.parameters.container}}' + dag_execution_id: '{{inputs.parameters.parent-dag-id}}' + iteration_index: '{{inputs.parameters.iteration-index}}' + kubernetes_config: '{{inputs.parameters.kubernetes-config}}' + pipeline_name: namespace/n1/pipeline/hello-world + pod_spec_patch_path: '{{outputs.parameters.pod-spec-patch.path}}' + run_id: '{{workflow.uid}}' + task: '{{inputs.parameters.task}}' + type: CONTAINER + namespace: '{{workflow.namespace}}' + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - containerSet: + containers: + - command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: main + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - command: + - launcher-v2 + - --copy + - /kfp-launcher/launch + image: gcr.io/ml-pipeline/kfp-launcher + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}' + - name: container + value: '{{workflow.parameters.implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: hello-world-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.hello-world-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.hello-world-driver.outputs.parameters.cached-decision}}' + depends: hello-world-driver.Succeeded + name: hello-world + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + - default: "" + name: run-metadata + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + jsonPath: $.execution-id + - name: iteration-count + valueFrom: + default: "0" + jsonPath: $.iteration-count + - name: condition + valueFrom: + default: "true" + jsonPath: $.condition + plugin: + driver-plugin: + args: + component: '{{inputs.parameters.component}}' + condition_path: '{{outputs.parameters.condition.path}}' + dag_execution_id: '{{inputs.parameters.parent-dag-id}}' + execution_id_path: '{{outputs.parameters.execution-id.path}}' + iteration_count_path: '{{outputs.parameters.iteration-count.path}}' + iteration_index: '{{inputs.parameters.iteration-index}}' + pipeline_name: namespace/n1/pipeline/hello-world + run_id: '{{workflow.uid}}' + run_metadata: '{{inputs.parameters.run-metadata}}' + runtime_config: '{{inputs.parameters.runtime-config}}' + task: '{{inputs.parameters.task}}' + type: '{{inputs.parameters.driver-type}}' + namespace: '{{workflow.namespace}}' + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{"parameters":{"text":{"stringValue":"hi there"}}}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null \ No newline at end of file diff --git a/proposals/separate-standalone-driver/kfp-pods.png b/proposals/separate-standalone-driver/kfp-pods.png new file mode 100644 index 00000000000..0fa046536d5 Binary files /dev/null and b/proposals/separate-standalone-driver/kfp-pods.png differ diff --git a/proposals/separate-standalone-driver/plugin.md b/proposals/separate-standalone-driver/plugin.md new file mode 100644 index 00000000000..c373df26fb1 --- /dev/null +++ b/proposals/separate-standalone-driver/plugin.md @@ -0,0 +1,49 @@ +## Mock Implementation of Argo Workflow Executor Plugin + +According to the [documentation](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/), an Argo Workflow plugin is a sidecar container that runs in the agent pod. The agent pod is created once for each workflow. + +### terminology +- kfp-driver-server — the KFP [driver](https://github.com/kubeflow/pipelines/tree/master/backend/src/v2/driver) component extracted from the workflow pod and deployed as a global HTTP/gRPC server. +- driver-plugin - our implementation of the Executor plugin + +*One limitation is that the service account token is not mounted into the sidecar container, which means it cannot interact with the Kubernetes API. This is required for drivers.* +As a result, driver-plugin implementations should merely act as a proxy between the global kfp-driver-server and the Argo Workflow controller. + +### Prerequisites +- According to the [configuration](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/#configuration) ARGO_EXECUTOR_PLUGINS should be set to true +- Add additional [workflow RBAC](https://argo-workflows.readthedocs.io/en/latest/http-template/#argo-agent-rbac) for the agent + +1. Implement the driver plugin that simply proxies requests from the workflow controller to the kfp-driver-server and back. +2. Build the image for the driver plugin. +3. Create the [yaml description](plugin.yaml) of the plugin +4. [Create](https://argo-workflows.readthedocs.io/en/latest/cli/argo_executor-plugin_build/) the configmap by executing ```argo executor-plugin build .``` in the yaml description folder from the step 3 +5. Apply the created ConfigMap to the workflow-controller Kubernetes namespace. +6. Create the service account driver-plugin-executor-plugin and set automountServiceAccountToken: true in the sidecar plugin ConfigMap (required for Kubernetes API access; see details below). + +After that, you will be able to reference the corresponding driver plugin in your Argo Workflow using: +```yaml +plugin: + driver-plugin: + ... +``` + +### Interaction With the Kubernetes API From a Sidecar Container +The driver [requires](https://github.com/kubeflow/pipelines/blob/master/backend/src/v2/driver/k8s.go#L68) access to the k8s API. +However, by default, the Argo Workflow Controller does not mount the service account token in the executor plugin's sidecar container. Moreover, it [disabled](https://github.com/argoproj/argo-workflows/pull/8028) the ability to mount the Workflow's service account to the executor plugin. +As a result, to enable access to the Kubernetes API: +1. Create ServiceAccount in each profile namespace with the name `driver-plugin-executor-plugin`. Argo WF [expects](https://github.com/argoproj/argo-workflows/blob/main/workflow/controller/agent.go#L285) the format -executor-plugin +2. Add a Role with appropriate Kubernetes API access and bind it to the service account. +3. Configure `sidecar.automountServiceAccountToken` see [example](plugin.yaml) + +### Securing the driver sidecar container +The driver's sidecar [exposes](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/#example-a-simple-python-plugin) the HTTP `/api/v1/template.execute` API externally. So hypothetically not only Argo WF Controller able to call it. +To prevent unauthorized access + +by default (without extra customization): +- The Argo Workflow Controller mounts the `/var/run/argo` volume into the executor agent pod (which hosts the plugin sidecar). This volume contains a token. +- The Argo Workflow Controller includes the same token in the authorization Bearer header of each execution request. + +additionally: +The central driver needs to read the token from `/var/run/argo/token` and compare it with the token from the request header. +More details are available [here](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/#example-a-simple-python-plugin) + diff --git a/proposals/separate-standalone-driver/plugin.yaml b/proposals/separate-standalone-driver/plugin.yaml new file mode 100644 index 00000000000..21dc1f0b50f --- /dev/null +++ b/proposals/separate-standalone-driver/plugin.yaml @@ -0,0 +1,23 @@ +# Sample of the config map for mounting the executor plugin +apiVersion: v1 +kind: ConfigMap +metadata: + labels: + workflows.argoproj.io/configmap-type: ExecutorPlugin # Workflow Controller applies the plugin configuration based on this label + name: driver-plugin +data: + sidecar.automountServiceAccountToken: "true" # Enables automatic mounting of the service account token in the sidecar + sidecar.container: | + image: ... + name: driver-plugin + resources: + ... + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: 1000 # Must run as a non-nobody user to access the token inside the /var/run/argo volume from the sidecar +