Skip to content

Commit cc1c435

Browse files
authored
feat(backend): Add support for importing models stored in the Modelcar format (sidecar) (kubeflow#11606)
This allows dsl.import to leverage Modelcar container images in an OCI repository. This works by having an init container prepull the image and then adding a sidecar container when the launcher container is running. The Modelcar container adds a symlink to its /models directory in an emptyDir volume that is accessible by the launcher container. Once the launcher is done running the user code, it stops the Modelcar containers. This approach has the benefit of leveraging image pull secrets configured on the Kubernetes cluster rather than require separate credentials for importing the artifact. Additionally, no data is copied to the emptyDir volume, so the storage cost is just pulling the Modelcar container image on the Kubernetes worker node. Note that once Kubernetes supports OCI images as volume mounts for several releases, consider replacing the init container with that approach. This also adds a new environment variable of PIPELINE_RUN_AS_USER to set the runAsUser on all Pods created by Argo Workflows. Resolves: kubeflow#11584 Signed-off-by: mprahl <[email protected]>
1 parent c0778ba commit cc1c435

File tree

15 files changed

+789
-10
lines changed

15 files changed

+789
-10
lines changed

.github/workflows/kfp-samples.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,18 @@ jobs:
3636
with:
3737
k8s_version: ${{ matrix.k8s_version }}
3838

39+
- name: Build and upload the sample Modelcar image to Kind
40+
run: |
41+
docker build -f samples/v2/modelcar_import/Dockerfile -t registry.domain.local/modelcar:test .
42+
kind --name kfp load docker-image registry.domain.local/modelcar:test
43+
3944
- name: Forward API port
4045
run: ./.github/resources/scripts/forward-port.sh "kubeflow" "ml-pipeline" 8888 8888
4146

4247
- name: Run Samples Tests
4348
id: tests
49+
env:
50+
PULL_NUMBER: ${{ github.event.pull_request.number }}
4451
run: |
4552
./backend/src/v2/test/sample-test.sh
4653
continue-on-error: true

backend/src/v2/compiler/argocompiler/argo.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
120120
Entrypoint: tmplEntrypoint,
121121
},
122122
}
123+
124+
runAsUser := GetPipelineRunAsUser()
125+
if runAsUser != nil {
126+
wf.Spec.SecurityContext = &k8score.PodSecurityContext{RunAsUser: runAsUser}
127+
}
128+
123129
c := &workflowCompiler{
124130
wf: wf,
125131
templates: make(map[string]*wfapi.Template),

backend/src/v2/compiler/argocompiler/argo_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func Test_argo_compiler(t *testing.T) {
3636
jobPath string // path of input PipelineJob to compile
3737
platformSpecPath string // path of possible input PlatformSpec to compile
3838
argoYAMLPath string // path of expected output argo workflow YAML
39+
envVars map[string]string
3940
}{
4041
{
4142
jobPath: "../testdata/hello_world.json",
@@ -67,9 +68,33 @@ func Test_argo_compiler(t *testing.T) {
6768
platformSpecPath: "",
6869
argoYAMLPath: "testdata/exit_handler.yaml",
6970
},
71+
{
72+
jobPath: "../testdata/hello_world.json",
73+
platformSpecPath: "",
74+
argoYAMLPath: "testdata/hello_world_run_as_user.yaml",
75+
envVars: map[string]string{"PIPELINE_RUN_AS_USER": "1001"},
76+
},
7077
}
7178
for _, tt := range tests {
7279
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
80+
prevEnvVars := map[string]string{}
81+
82+
for envVarName, envVarValue := range tt.envVars {
83+
prevEnvVars[envVarName] = os.Getenv(envVarName)
84+
85+
os.Setenv(envVarName, envVarValue)
86+
}
87+
88+
defer func() {
89+
for envVarName, envVarValue := range prevEnvVars {
90+
if envVarValue == "" {
91+
os.Unsetenv(envVarName)
92+
} else {
93+
os.Setenv(envVarName, envVarValue)
94+
}
95+
}
96+
}()
97+
7398
job, platformSpec := load(t, tt.jobPath, tt.platformSpecPath)
7499
if *update {
75100
wf, err := argocompiler.Compile(job, platformSpec, nil)

backend/src/v2/compiler/argocompiler/container.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ package argocompiler
1717
import (
1818
"fmt"
1919
"os"
20+
"strconv"
2021
"strings"
2122

2223
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
24+
"github.com/golang/glog"
2325
"github.com/golang/protobuf/jsonpb"
2426
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
2527
"github.com/kubeflow/pipelines/backend/src/v2/component"
@@ -36,6 +38,7 @@ const (
3638
DriverImageEnvVar = "V2_DRIVER_IMAGE"
3739
DefaultDriverCommand = "driver"
3840
DriverCommandEnvVar = "V2_DRIVER_COMMAND"
41+
PipelineRunAsUserEnvVar = "PIPELINE_RUN_AS_USER"
3942
gcsScratchLocation = "/gcs"
4043
gcsScratchName = "gcs-scratch"
4144
s3ScratchLocation = "/s3"
@@ -101,6 +104,25 @@ func GetDriverCommand() []string {
101104
return strings.Split(driverCommand, " ")
102105
}
103106

107+
func GetPipelineRunAsUser() *int64 {
108+
runAsUserStr := os.Getenv(PipelineRunAsUserEnvVar)
109+
if runAsUserStr == "" {
110+
return nil
111+
}
112+
113+
runAsUser, err := strconv.ParseInt(runAsUserStr, 10, 64)
114+
if err != nil {
115+
glog.Error(
116+
"Failed to parse the %s environment variable with value %s as an int64: %v",
117+
PipelineRunAsUserEnvVar, runAsUserStr, err,
118+
)
119+
120+
return nil
121+
}
122+
123+
return &runAsUser
124+
}
125+
104126
func (c *workflowCompiler) containerDriverTask(name string, inputs containerDriverInputs) (*wfapi.DAGTask, *containerDriverOutputs) {
105127
dagTask := &wfapi.DAGTask{
106128
Name: name,

0 commit comments

Comments
 (0)