Skip to content

Commit 8a08de2

Browse files
Merge pull request #182 from hbelmiro/fix-cache
UPSTREAM<drop> Fix boolean flag syntax for --cache_disabled and update test to cover pipelines with outputs
2 parents dca9085 + b7ef064 commit 8a08de2

File tree

8 files changed

+109
-10
lines changed

8 files changed

+109
-10
lines changed

backend/src/v2/compiler/argocompiler/container.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
208208
"--ca_cert_path", common.GetCaCertPath(),
209209
}
210210
if c.cacheDisabled {
211-
args = append(args, "--cache_disabled", "true")
211+
args = append(args, "--cache_disabled")
212212
}
213213
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
214214
args = append(args, "--log_level", value)
@@ -352,7 +352,7 @@ func (c *workflowCompiler) addContainerExecutorTemplate(task *pipelinespec.Pipel
352352
"--copy", component.KFPLauncherPath,
353353
}
354354
if c.cacheDisabled {
355-
args = append(args, "--cache_disabled", "true")
355+
args = append(args, "--cache_disabled")
356356
}
357357
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
358358
args = append(args, "--log_level", value)

backend/src/v2/compiler/argocompiler/dag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
565565
"--ca_cert_path", common.GetCaCertPath(),
566566
}
567567
if c.cacheDisabled {
568-
args = append(args, "--cache_disabled", "true")
568+
args = append(args, "--cache_disabled")
569569
}
570570
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
571571
args = append(args, "--log_level", value)

backend/src/v2/compiler/argocompiler/importer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
8585
"--ca_cert_path", common.GetCaCertPath(),
8686
}
8787
if c.cacheDisabled {
88-
args = append(args, "--cache_disabled", "true")
88+
args = append(args, "--cache_disabled")
8989
}
9090
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
9191
args = append(args, "--log_level", value)

backend/src/v2/compiler/argocompiler/testdata/hello_world_cache_disabled.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ spec:
7272
- --ca_cert_path
7373
- ""
7474
- --cache_disabled
75-
- 'true'
7675
command:
7776
- driver
7877
env:
@@ -176,7 +175,6 @@ spec:
176175
- --copy
177176
- /kfp-launcher/launch
178177
- --cache_disabled
179-
- "true"
180178
command:
181179
- launcher-v2
182180
image: ghcr.io/kubeflow/kfp-launcher
@@ -287,7 +285,6 @@ spec:
287285
- --ca_cert_path
288286
- ""
289287
- --cache_disabled
290-
- 'true'
291288
command:
292289
- driver
293290
env:

backend/src/v2/driver/driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func initPodSpecPatch(
216216
"--ca_cert_path", caCertPath,
217217
}
218218
if cacheDisabled == "true" {
219-
launcherCmd = append(launcherCmd, "--cache_disabled", cacheDisabled)
219+
launcherCmd = append(launcherCmd, "--cache_disabled")
220220
}
221221
if pipelineLogLevel != "1" {
222222
// Add log level to user code launcher if not default (set to 1)

backend/test/v2/integration/cache_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ func (s *CacheTestSuite) TestCacheRecurringRun() {
136136
IntervalSecond: 60,
137137
},
138138
},
139+
RuntimeConfig: &recurring_run_model.V2beta1RuntimeConfig{
140+
Parameters: map[string]interface{}{
141+
"message": "Hello world",
142+
},
143+
},
139144
}}
140145
helloWorldRecurringRun, err := s.recurringRunClient.Create(createRecurringRunRequest)
141146
require.NoError(t, err)
@@ -251,6 +256,11 @@ func (s *CacheTestSuite) createRun(pipelineVersion *pipeline_upload_model.V2beta
251256
PipelineID: pipelineVersion.PipelineID,
252257
PipelineVersionID: pipelineVersion.PipelineVersionID,
253258
},
259+
RuntimeConfig: &run_model.V2beta1RuntimeConfig{
260+
Parameters: map[string]interface{}{
261+
"message": "Hello world",
262+
},
263+
},
254264
}}
255265
pipelineRunDetail, err := s.runClient.Create(createRunRequest)
256266
require.NoError(s.T(), err)
@@ -268,12 +278,12 @@ func (s *CacheTestSuite) createRun(pipelineVersion *pipeline_upload_model.V2beta
268278
}
269279

270280
func (s *CacheTestSuite) preparePipeline() *pipeline_upload_model.V2beta1PipelineVersion {
271-
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams())
281+
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world-with-returning-component.yaml", uploadParams.NewUploadPipelineParams())
272282
require.NoError(s.T(), err)
273283

274284
time.Sleep(1 * time.Second)
275285
pipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion(
276-
"../resources/hello-world.yaml", &uploadParams.UploadPipelineVersionParams{
286+
"../resources/hello-world-with-returning-component.yaml", &uploadParams.UploadPipelineVersionParams{
277287
Name: util.StringPointer("hello-world-version"),
278288
Pipelineid: util.StringPointer(pipeline.PipelineID),
279289
})
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from kfp import dsl
2+
3+
4+
@dsl.component(base_image="public.ecr.aws/docker/library/python:3.12")
5+
def comp(message: str) -> str:
6+
print(message)
7+
return message
8+
9+
10+
@dsl.pipeline
11+
def my_pipeline(message: str) -> str:
12+
return comp(message=message).output
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# PIPELINE DEFINITION
2+
# Name: my-pipeline
3+
# Inputs:
4+
# message: str
5+
# Outputs:
6+
# Output: str
7+
components:
8+
comp-comp:
9+
executorLabel: exec-comp
10+
inputDefinitions:
11+
parameters:
12+
message:
13+
parameterType: STRING
14+
outputDefinitions:
15+
parameters:
16+
Output:
17+
parameterType: STRING
18+
deploymentSpec:
19+
executors:
20+
exec-comp:
21+
container:
22+
args:
23+
- --executor_input
24+
- '{{$}}'
25+
- --function_to_execute
26+
- comp
27+
command:
28+
- sh
29+
- -c
30+
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
31+
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
32+
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\
33+
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
34+
$0\" \"$@\"\n"
35+
- sh
36+
- -ec
37+
- 'program_path=$(mktemp -d)
38+
39+
40+
printf "%s" "$0" > "$program_path/ephemeral_component.py"
41+
42+
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
43+
44+
'
45+
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
46+
\ *\n\ndef comp(message: str) -> str:\n print(message)\n return message\n\
47+
\n"
48+
image: public.ecr.aws/docker/library/python:3.12
49+
pipelineInfo:
50+
name: my-pipeline
51+
root:
52+
dag:
53+
outputs:
54+
parameters:
55+
Output:
56+
valueFromParameter:
57+
outputParameterKey: Output
58+
producerSubtask: comp
59+
tasks:
60+
comp:
61+
cachingOptions:
62+
enableCache: true
63+
componentRef:
64+
name: comp-comp
65+
inputs:
66+
parameters:
67+
message:
68+
componentInputParameter: message
69+
taskInfo:
70+
name: comp
71+
inputDefinitions:
72+
parameters:
73+
message:
74+
parameterType: STRING
75+
outputDefinitions:
76+
parameters:
77+
Output:
78+
parameterType: STRING
79+
schemaVersion: 2.1.0
80+
sdkVersion: kfp-2.13.0

0 commit comments

Comments
 (0)