Skip to content

Commit c654567

Browse files
Merge pull request #157 from HumairAK/cherry-pick
Cherry pick Toleration list support
2 parents 3a61807 + e09d33c commit c654567

File tree

17 files changed

+515
-85
lines changed

17 files changed

+515
-85
lines changed

backend/api/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,6 @@ API generator image is defined in [Dockerfile](`./Dockerfile`). If you need to u
8484
1. Login to GHCR container registry: `echo "<PAT>" | docker login ghcr.io -u <USERNAME> --password-stdin`
8585
* Replace `<PAT>` with a GitHub Personal Access Token (PAT) with the write:packages and `read:packages` scopes, as well as `delete:packages` if needed.
8686
1. Update the [Dockerfile](`./Dockerfile`) and build the image by running `docker build -t ghcr.io/kubeflow/kfp-api-generator:$VERSION .`
87-
1. Push the new container by running `docker push ghcr.io/kubeflow/kfp-api-generator:$VERSION` (requires to be [authenticated](https://cloud.google.com/container-registry/docs/advanced-authentication)).
87+
1. Push the new container by running `docker push ghcr.io/kubeflow/kfp-api-generator:$VERSION`.
8888
1. Update the `PREBUILT_REMOTE_IMAGE` variable in the [Makefile](./Makefile) to point to your new image.
8989
1. Similarly, push a new version of the release tools image to `ghcr.io/kubeflow/kfp-release:$VERSION` and run `make push` in [test/release/Makefile](../../test/release/Makefile).

backend/src/v2/driver/driver.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -785,19 +785,51 @@ func extendPodSpecPatch(
785785
if toleration != nil {
786786
k8sToleration := &k8score.Toleration{}
787787
if toleration.TolerationJson != nil {
788-
err := resolveK8sJsonParameter(ctx, opts, dag, pipeline, mlmd,
789-
toleration.GetTolerationJson(), inputParams, k8sToleration)
788+
resolvedParam, err := resolveInputParameter(ctx, dag, pipeline, opts, mlmd,
789+
toleration.GetTolerationJson(), inputParams)
790790
if err != nil {
791791
return fmt.Errorf("failed to resolve toleration: %w", err)
792792
}
793+
794+
// TolerationJson can be either a single toleration or list of tolerations
795+
// the field accepts both, and in both cases the tolerations are appended
796+
// to the total executor pod toleration list.
797+
var paramJSON []byte
798+
isSingleToleration := resolvedParam.GetStructValue() != nil
799+
isListToleration := resolvedParam.GetListValue() != nil
800+
if isSingleToleration {
801+
paramJSON, err = resolvedParam.GetStructValue().MarshalJSON()
802+
if err != nil {
803+
return err
804+
}
805+
var singleToleration k8score.Toleration
806+
if err = json.Unmarshal(paramJSON, &singleToleration); err != nil {
807+
return fmt.Errorf("failed to marshal single toleration to json: %w", err)
808+
}
809+
k8sTolerations = append(k8sTolerations, singleToleration)
810+
} else if isListToleration {
811+
paramJSON, err = resolvedParam.GetListValue().MarshalJSON()
812+
if err != nil {
813+
return err
814+
}
815+
var k8sTolerationsList []k8score.Toleration
816+
if err = json.Unmarshal(paramJSON, &k8sTolerationsList); err != nil {
817+
return fmt.Errorf("failed to marshal list toleration to json: %w", err)
818+
}
819+
k8sTolerations = append(k8sTolerations, k8sTolerationsList...)
820+
} else {
821+
return fmt.Errorf("encountered unexpected toleration proto value, "+
822+
"must be either struct or list type: %w", err)
823+
}
793824
} else {
794825
k8sToleration.Key = toleration.Key
795826
k8sToleration.Operator = k8score.TolerationOperator(toleration.Operator)
796827
k8sToleration.Value = toleration.Value
797828
k8sToleration.Effect = k8score.TaintEffect(toleration.Effect)
798829
k8sToleration.TolerationSeconds = toleration.TolerationSeconds
830+
k8sTolerations = append(k8sTolerations, *k8sToleration)
799831
}
800-
k8sTolerations = append(k8sTolerations, *k8sToleration)
832+
801833
}
802834
}
803835
podSpec.Tolerations = k8sTolerations

backend/src/v2/driver/driver_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2075,6 +2075,165 @@ func Test_extendPodSpecPatch_Tolerations(t *testing.T) {
20752075
}),
20762076
},
20772077
},
2078+
{
2079+
"Valid - toleration json - toleration list",
2080+
&kubernetesplatform.KubernetesExecutorConfig{
2081+
Tolerations: []*kubernetesplatform.Toleration{
2082+
{
2083+
TolerationJson: inputParamComponent("param_1"),
2084+
},
2085+
},
2086+
},
2087+
&k8score.PodSpec{
2088+
Containers: []k8score.Container{
2089+
{
2090+
Name: "main",
2091+
},
2092+
},
2093+
Tolerations: []k8score.Toleration{
2094+
{
2095+
Key: "key1",
2096+
Operator: "Equal",
2097+
Value: "value1",
2098+
Effect: "NoSchedule",
2099+
TolerationSeconds: int64Ptr(3601),
2100+
},
2101+
{
2102+
Key: "key2",
2103+
Operator: "Equal",
2104+
Value: "value2",
2105+
Effect: "NoSchedule",
2106+
TolerationSeconds: int64Ptr(3602),
2107+
},
2108+
{
2109+
Key: "key3",
2110+
Operator: "Equal",
2111+
Value: "value3",
2112+
Effect: "NoSchedule",
2113+
TolerationSeconds: int64Ptr(3603),
2114+
},
2115+
},
2116+
},
2117+
map[string]*structpb.Value{
2118+
"param_1": validListOfStructsOrPanic([]map[string]interface{}{
2119+
{
2120+
"key": "key1",
2121+
"operator": "Equal",
2122+
"value": "value1",
2123+
"effect": "NoSchedule",
2124+
"tolerationSeconds": 3601,
2125+
},
2126+
{
2127+
"key": "key2",
2128+
"operator": "Equal",
2129+
"value": "value2",
2130+
"effect": "NoSchedule",
2131+
"tolerationSeconds": 3602,
2132+
},
2133+
{
2134+
"key": "key3",
2135+
"operator": "Equal",
2136+
"value": "value3",
2137+
"effect": "NoSchedule",
2138+
"tolerationSeconds": 3603,
2139+
},
2140+
}),
2141+
},
2142+
},
2143+
{
2144+
"Valid - toleration json - list toleration & single toleration & constant toleration",
2145+
&kubernetesplatform.KubernetesExecutorConfig{
2146+
Tolerations: []*kubernetesplatform.Toleration{
2147+
{
2148+
TolerationJson: inputParamComponent("param_1"),
2149+
},
2150+
{
2151+
TolerationJson: inputParamComponent("param_2"),
2152+
},
2153+
{
2154+
Key: "key5",
2155+
Operator: "Equal",
2156+
Value: "value5",
2157+
Effect: "NoSchedule",
2158+
},
2159+
},
2160+
},
2161+
&k8score.PodSpec{
2162+
Containers: []k8score.Container{
2163+
{
2164+
Name: "main",
2165+
},
2166+
},
2167+
Tolerations: []k8score.Toleration{
2168+
{
2169+
Key: "key1",
2170+
Operator: "Equal",
2171+
Value: "value1",
2172+
Effect: "NoSchedule",
2173+
TolerationSeconds: int64Ptr(3601),
2174+
},
2175+
{
2176+
Key: "key2",
2177+
Operator: "Equal",
2178+
Value: "value2",
2179+
Effect: "NoSchedule",
2180+
TolerationSeconds: int64Ptr(3602),
2181+
},
2182+
{
2183+
Key: "key3",
2184+
Operator: "Equal",
2185+
Value: "value3",
2186+
Effect: "NoSchedule",
2187+
TolerationSeconds: int64Ptr(3603),
2188+
},
2189+
{
2190+
Key: "key4",
2191+
Operator: "Equal",
2192+
Value: "value4",
2193+
Effect: "NoSchedule",
2194+
TolerationSeconds: int64Ptr(3604),
2195+
},
2196+
{
2197+
Key: "key5",
2198+
Operator: "Equal",
2199+
Value: "value5",
2200+
Effect: "NoSchedule",
2201+
},
2202+
},
2203+
},
2204+
map[string]*structpb.Value{
2205+
"param_1": validListOfStructsOrPanic([]map[string]interface{}{
2206+
{
2207+
"key": "key1",
2208+
"operator": "Equal",
2209+
"value": "value1",
2210+
"effect": "NoSchedule",
2211+
"tolerationSeconds": 3601,
2212+
},
2213+
{
2214+
"key": "key2",
2215+
"operator": "Equal",
2216+
"value": "value2",
2217+
"effect": "NoSchedule",
2218+
"tolerationSeconds": 3602,
2219+
},
2220+
{
2221+
"key": "key3",
2222+
"operator": "Equal",
2223+
"value": "value3",
2224+
"effect": "NoSchedule",
2225+
"tolerationSeconds": 3603,
2226+
},
2227+
}),
2228+
"param_2": validValueStructOrPanic(map[string]interface{}{
2229+
"key": "key4",
2230+
"operator": "Equal",
2231+
"value": "value4",
2232+
"effect": "NoSchedule",
2233+
"tolerationSeconds": 3604,
2234+
}),
2235+
},
2236+
},
20782237
}
20792238
for _, tt := range tests {
20802239
t.Run(tt.name, func(t *testing.T) {
@@ -2554,6 +2713,18 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
25542713
}
25552714
}
25562715

2716+
func validListOfStructsOrPanic(data []map[string]interface{}) *structpb.Value {
2717+
var listValues []*structpb.Value
2718+
for _, item := range data {
2719+
s, err := structpb.NewStruct(item)
2720+
if err != nil {
2721+
panic(err)
2722+
}
2723+
listValues = append(listValues, structpb.NewStructValue(s))
2724+
}
2725+
return structpb.NewListValue(&structpb.ListValue{Values: listValues})
2726+
}
2727+
25572728
func validValueStructOrPanic(data map[string]interface{}) *structpb.Value {
25582729
s, err := structpb.NewStruct(data)
25592730
if err != nil {

backend/test/integration/pipeline_api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
122122
time.Sleep(1 * time.Second)
123123
sequentialPipeline, err := s.pipelineClient.Create(&params.PipelineServiceCreatePipelineV1Params{
124124
Body: &model.APIPipeline{Name: "sequential", URL: &model.APIURL{
125-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/sequential.yaml",
125+
PipelineURL: "https://raw.githubusercontent.com/kubeflow/pipelines/refs/heads/master/backend/test/v2/resources/sequential.yaml",
126126
}},
127127
})
128128
require.Nil(t, err)
@@ -139,7 +139,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
139139
time.Sleep(1 * time.Second)
140140
argumentUrlPipeline, err := s.pipelineClient.Create(&params.PipelineServiceCreatePipelineV1Params{
141141
Body: &model.APIPipeline{URL: &model.APIURL{
142-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.pipeline.zip",
142+
PipelineURL: "https://github.com/kubeflow/pipelines/raw/refs/heads/master/backend/test/v2/resources/arguments.pipeline.zip",
143143
}},
144144
})
145145
require.Nil(t, err)

backend/test/integration/pipeline_version_api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (s *PipelineVersionApiTest) TestArgoSpec() {
145145
Body: &pipeline_model.APIPipelineVersion{
146146
Name: "sequential",
147147
PackageURL: &pipeline_model.APIURL{
148-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/sequential.yaml",
148+
PipelineURL: "https://raw.githubusercontent.com/kubeflow/pipelines/refs/heads/master/backend/test/v2/resources/sequential.yaml",
149149
},
150150
ResourceReferences: []*pipeline_model.APIResourceReference{
151151
{
@@ -174,7 +174,7 @@ func (s *PipelineVersionApiTest) TestArgoSpec() {
174174
Body: &pipeline_model.APIPipelineVersion{
175175
Name: "arguments",
176176
PackageURL: &pipeline_model.APIURL{
177-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.pipeline.zip",
177+
PipelineURL: "https://github.com/kubeflow/pipelines/raw/refs/heads/master/backend/test/resources/arguments.pipeline.zip",
178178
},
179179
ResourceReferences: []*pipeline_model.APIResourceReference{
180180
{

backend/test/integration/upgrade_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func (s *UpgradeTests) PreparePipelines() {
272272
time.Sleep(1 * time.Second)
273273
sequentialPipeline, err := s.pipelineClient.Create(&pipelineParams.PipelineServiceCreatePipelineV1Params{
274274
Body: &pipeline_model.APIPipeline{Name: "sequential", URL: &pipeline_model.APIURL{
275-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/sequential.yaml",
275+
PipelineURL: "https://raw.githubusercontent.com/kubeflow/pipelines/refs/heads/master/backend/test/v2/resources/sequential.yaml",
276276
}},
277277
})
278278
require.Nil(t, err)
@@ -289,7 +289,7 @@ func (s *UpgradeTests) PreparePipelines() {
289289
time.Sleep(1 * time.Second)
290290
argumentUrlPipeline, err := s.pipelineClient.Create(&pipelineParams.PipelineServiceCreatePipelineV1Params{
291291
Body: &pipeline_model.APIPipeline{URL: &pipeline_model.APIURL{
292-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.pipeline.zip",
292+
PipelineURL: "https://github.com/kubeflow/pipelines/raw/refs/heads/master/backend/test/v2/resources/arguments.pipeline.zip",
293293
}},
294294
})
295295
require.Nil(t, err)
@@ -535,20 +535,20 @@ func (s *UpgradeTests) VerifyCreatingRunsAndJobs() {
535535

536536
/* ---------- Create a new recurring run based on the second oldest pipeline version and belonging to the second oldest experiment ---------- */
537537
createJobRequest := &jobparams.JobServiceCreateJobParams{Body: &job_model.APIJob{
538-
Name: "sequential job from pipeline version",
539-
Description: "a recurring run from an old pipeline version",
538+
Description: "a recurring run from an old pipeline version",
539+
Enabled: true,
540+
MaxConcurrency: 10,
541+
Name: "sequential job from pipeline version",
540542
ResourceReferences: []*job_model.APIResourceReference{
541543
{
542544
Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experiments[1].ID},
543545
Relationship: job_model.APIRelationshipOWNER,
544546
},
545547
{
546-
Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypePIPELINEVERSION, ID: pipelines[1].DefaultVersion.ID},
548+
Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypePIPELINEVERSION, ID: pipelines[0].DefaultVersion.ID},
547549
Relationship: job_model.APIRelationshipCREATOR,
548550
},
549551
},
550-
MaxConcurrency: 10,
551-
Enabled: true,
552552
}}
553553
createdJob, err := s.jobClient.Create(createJobRequest)
554554
assert.Nil(t, err)

backend/test/v2/integration/pipeline_api_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
126126
},
127127
PipelineVersion: &model.V2beta1PipelineVersion{
128128
PackageURL: &model.V2beta1URL{
129-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/v2/sequential.yaml",
129+
PipelineURL: "https://raw.githubusercontent.com/kubeflow/pipelines/refs/heads/master/backend/test/v2/resources/sequential-v2.yaml",
130130
},
131131
},
132132
},
@@ -141,7 +141,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
141141
assert.Equal(t, "sequential", sequentialPipelineVersions[0].DisplayName)
142142
assert.Equal(t, "sequential pipeline", sequentialPipelineVersions[0].Description)
143143
assert.Equal(t, sequentialPipeline.PipelineID, sequentialPipelineVersions[0].PipelineID)
144-
assert.Equal(t, "https://storage.googleapis.com/ml-pipeline-dataset/v2/sequential.yaml", sequentialPipelineVersions[0].PackageURL.PipelineURL)
144+
assert.Equal(t, "https://raw.githubusercontent.com/kubeflow/pipelines/refs/heads/master/backend/test/v2/resources/sequential-v2.yaml", sequentialPipelineVersions[0].PackageURL.PipelineURL)
145145

146146
/* ---------- Upload pipelines zip ---------- */
147147
time.Sleep(1 * time.Second)
@@ -164,15 +164,15 @@ func (s *PipelineApiTest) TestPipelineAPI() {
164164
Description: "1st version of argument url pipeline",
165165
PipelineID: sequentialPipeline.PipelineID,
166166
PackageURL: &model.V2beta1URL{
167-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/v2/arguments.pipeline.zip",
167+
PipelineURL: "https://github.com/kubeflow/pipelines/raw/refs/heads/master/backend/test/v2/resources/arguments.pipeline.zip",
168168
},
169169
},
170170
})
171171
require.Nil(t, err)
172172
assert.Equal(t, "argumentUrl-v1", argumentUrlPipelineVersion.DisplayName)
173173
assert.Equal(t, "1st version of argument url pipeline", argumentUrlPipelineVersion.Description)
174174
assert.Equal(t, argumentUrlPipeline.PipelineID, argumentUrlPipelineVersion.PipelineID)
175-
assert.Equal(t, "https://storage.googleapis.com/ml-pipeline-dataset/v2/arguments.pipeline.zip", argumentUrlPipelineVersion.PackageURL.PipelineURL)
175+
assert.Equal(t, "https://github.com/kubeflow/pipelines/raw/refs/heads/master/backend/test/v2/resources/arguments.pipeline.zip", argumentUrlPipelineVersion.PackageURL.PipelineURL)
176176

177177
/* ---------- Verify list pipeline works ---------- */
178178
pipelines, totalSize, _, err := s.pipelineClient.List(&params.PipelineServiceListPipelinesParams{})

backend/test/v2/integration/pipeline_version_api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (s *PipelineVersionApiTest) TestPipelineSpec() {
137137
Body: &pipeline_model.V2beta1PipelineVersion{
138138
DisplayName: "sequential",
139139
PackageURL: &pipeline_model.V2beta1URL{
140-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/v2/sequential.yaml",
140+
PipelineURL: "https://raw.githubusercontent.com/kubeflow/pipelines/refs/heads/master/backend/test/v2/resources/sequential-v2.yaml",
141141
},
142142
PipelineID: pipelineId,
143143
},
@@ -162,7 +162,7 @@ func (s *PipelineVersionApiTest) TestPipelineSpec() {
162162
Body: &pipeline_model.V2beta1PipelineVersion{
163163
DisplayName: "arguments",
164164
PackageURL: &pipeline_model.V2beta1URL{
165-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/v2/arguments.pipeline.zip",
165+
PipelineURL: "https://github.com/kubeflow/pipelines/raw/refs/heads/master/backend/test/v2/resources/arguments.pipeline.zip",
166166
},
167167
PipelineID: pipelineId,
168168
},

backend/test/v2/integration/upgrade_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (s *UpgradeTests) PreparePipelines() {
278278
Body: &pipeline_model.V2beta1PipelineVersion{
279279
DisplayName: "sequential",
280280
PackageURL: &pipeline_model.V2beta1URL{
281-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/v2/sequential.yaml",
281+
PipelineURL: "https://raw.githubusercontent.com/kubeflow/pipelines/refs/heads/master/backend/test/v2/resources/sequential-v2.yaml",
282282
},
283283
PipelineID: sequentialPipeline.PipelineID,
284284
},
@@ -305,7 +305,7 @@ func (s *UpgradeTests) PreparePipelines() {
305305
Body: &pipeline_model.V2beta1PipelineVersion{
306306
DisplayName: "arguments",
307307
PackageURL: &pipeline_model.V2beta1URL{
308-
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/v2/arguments.pipeline.zip",
308+
PipelineURL: "https://github.com/kubeflow/pipelines/raw/refs/heads/master/backend/test/v2/resources/arguments.pipeline.zip",
309309
},
310310
PipelineID: argumentUrlPipeline.PipelineID,
311311
},

0 commit comments

Comments
 (0)