Skip to content

Commit 1935e0e

Browse files
author
arpechenin
committed
Add details
Signed-off-by: arpechenin <[email protected]>
1 parent 18deec1 commit 1935e0e

File tree

6 files changed

+91
-9
lines changed

6 files changed

+91
-9
lines changed

proposals/separate-standalone-driver/README.md

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,9 @@ Below is a scheme where, instead of creating a pod for the driver's task, we reu
125125
126126
To move from the container template to the Executor Plugin template:
127127
- patch the [Argo compiler](https://github.com/kubeflow/pipelines/tree/master/backend/src/v2/compiler/argocompiler) to generate a plugin template instead of a container template. Sample: hello-world [adapted](kfp-plugin-flow.png) (see name: system-container-driver)
128-
- Implement the [driver](https://github.com/kubeflow/pipelines/tree/master/backend/src/v2/driver) as a plugin - that is, as a HTTP Server which implements the expected [contract](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/#example-a-simple-python-plugin).
129-
This server will run as a sidecar container alongside the agent pod.
130-
- Enable plugin extensions in the workflow-controller. See the [Configuration](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/#configuration) guide.
131-
Then, [install]([Configuration](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/#configuration)) the plugin built in the previous step into the workflow-controller.
128+
- Namely, replace the templates used in the [container-driver](https://github.com/kubeflow/pipelines/blob/master/backend/src/v2/compiler/argocompiler/container.go#L148) and [dag-driver](https://github.com/kubeflow/pipelines/blob/master/backend/src/v2/compiler/argocompiler/dag.go#L156) section of the compiler
129+
- Extract the [driver](https://github.com/kubeflow/pipelines/tree/master/backend/src/v2/driver) component into a standalone server.
130+
- Implement the [plugin](plugin.md)
132131
133132
The sample of the Argo Workflow system-container-driver template based on plugin.
134133
```yaml
@@ -175,11 +174,6 @@ The sample of the Argo Workflow system-container-driver template based on plugin
175174
default: "true"
176175
jsonPath: $.condition
177176
```
178-
The `driver-plugin` section refers to the name under which the standalone driver was installed, as described in the [plugin](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/#example-a-simple-python-plugin)
179-
```yaml
180-
plugin:
181-
driver-plugin:
182-
```
183177
184178
## Conclusion
185179
This proposal introduces an optimization for Kubeflow Pipelines (KFP) that replaces per-task driver pods with a lightweight standalone service based on Argo Workflows’ Executor Plugin mechanism. It significantly reduces pipeline task startup time by eliminating the overhead of scheduling a separate driver pod for each task — particularly beneficial for large pipelines with multiple steps and caching enabled.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
## Mock Implementation of Argo Workflow Executor Plugin
2+
3+
According to the [documentation](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/), an Argo Workflow plugin is a sidecar container that runs in the agent pod. The agent pod is created once for each workflow.
4+
5+
### terminology
6+
- kfp-driver-server — the KFP [driver](https://github.com/kubeflow/pipelines/tree/master/backend/src/v2/driver) component extracted from the workflow pod and deployed as a global HTTP/gRPC server.
7+
- driver-plugin - our implementation of the Executor plugin
8+
9+
*One limitation is that the service account token is not mounted into the sidecar container, which means it cannot interact with the Kubernetes API. This is required for drivers.*
10+
As a result, driver-plugin implementations should merely act as a proxy between the global kfp-driver-server and the Argo Workflow controller.
11+
12+
### Prerequisites
13+
- According to the [configuration](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/#configuration) ARGO_EXECUTOR_PLUGINS should be set to true
14+
- Add additional [workflow RBAC](https://argo-workflows.readthedocs.io/en/latest/http-template/#argo-agent-rbac) for the agent
15+
16+
1. Implement the driver plugin that simply proxies requests from the workflow controller to the kfp-driver-server and back. Check the mock [implementation](src/driver-plugin)
17+
2. Build the image for the driver plugin.
18+
3. Create the [yaml description](src/driver-plugin/plugin.yaml) of the plugin
19+
4. [Create](https://argo-workflows.readthedocs.io/en/latest/cli/argo_executor-plugin_build/) the configmap by executing ```argo executor-plugin build .``` in the yaml description folder from the step 3
20+
5. Apply the created ConfigMap to the workflow-controller Kubernetes namespace.
21+
22+
After that, you will be able to reference the corresponding driver plugin in your Argo Workflow using:
23+
```yaml
24+
plugin:
25+
driver-plugin:
26+
...
27+
```
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
FROM python:3.12
2+
WORKDIR /app
3+
COPY . .
4+
RUN pip install -r requirements.txt
5+
CMD ["python", "main.py"]
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from fastapi import FastAPI, Request
2+
import uvicorn
3+
import aiohttp
4+
5+
app = FastAPI()
6+
7+
8+
async def call_driver(url, payload):
9+
async with aiohttp.ClientSession() as session:
10+
async with session.post(url, json=payload) as response:
11+
if response.status != 200:
12+
text = await response.text()
13+
raise Exception(f"driver call failed with status: {response.status} error: {text}")
14+
content_type = response.headers.get("Content-Type", "")
15+
if "application/json" not in content_type:
16+
text = await response.text()
17+
raise Exception(f"driver returns unexpected Content-Type: {content_type}, response: {text}")
18+
return await response.json()
19+
20+
21+
@app.post("/api/v1/template.execute")
22+
async def execute_plugin(request: Request):
23+
body = await request.json()
24+
payload = body.get("template", {}).get("plugin", {}).get("driver-plugin", {}).get("args", {})
25+
print("request payload:" + str(payload))
26+
response = await call_driver("http://ml-pipeline-kfp-driver.kubeflow.svc.cluster.local:2948/api/v1/execute", payload)
27+
print("response:", response)
28+
return response
29+
30+
31+
if __name__ == "__main__":
32+
uvicorn.run("main:app", host="0.0.0.0", port=2948, reload=True)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
apiVersion: argoproj.io/v1alpha1
2+
kind: ExecutorPlugin
3+
metadata:
4+
name: driver-plugin
5+
spec:
6+
sidecar:
7+
container:
8+
image: <YOUR_IMAGE>
9+
name: driver-plugin
10+
ports:
11+
- containerPort: 2948
12+
securityContext:
13+
runAsNonRoot: true
14+
runAsUser: 65534
15+
resources:
16+
requests:
17+
memory: "512Mi"
18+
cpu: "250m"
19+
limits:
20+
memory: "1Gi"
21+
cpu: "1"
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
fastapi
2+
aiohttp
3+
uvicorn[standard]

0 commit comments

Comments
 (0)