Skip to content

Commit a3d99d2

Browse files
committed
feat(backend): add support for uploading new sample pipeline vers (kubeflow#11553)
This change adds additional sampleconfig options that provide support for uploading new Pipelines and PipelineVersion samples. To accommodate this and backwards compatibility the structure of the samples config has been changed. Configs following the old format will continue to be supported. Sample config code is also moved to its own file so as not to bloat main.go In order to handle conflicts, and detecting pipeline/pipelineVersion existence, additional db queries are made per pipeline and pipeline version at apiserver startup. (cherry picked from commit d2ddb2e) Signed-off-by: Humair Khan <[email protected]>
1 parent 4453f41 commit a3d99d2

File tree

7 files changed

+537
-116
lines changed

7 files changed

+537
-116
lines changed
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
// Copyright 2025 The Kubeflow Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package config
16+
17+
import (
18+
"encoding/json"
19+
"fmt"
20+
"github.com/golang/glog"
21+
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
22+
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
23+
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
24+
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
25+
"github.com/kubeflow/pipelines/backend/src/apiserver/server"
26+
"github.com/kubeflow/pipelines/backend/src/common/util"
27+
"google.golang.org/grpc/codes"
28+
"os"
29+
"time"
30+
)
31+
32+
// deprecated
33+
type deprecatedConfig struct {
34+
Name string
35+
Description string
36+
File string
37+
}
38+
39+
type configPipelines struct {
40+
Name string
41+
Description string
42+
File string
43+
// optional, Name is used for PipelineVersion if not provided
44+
VersionName string
45+
// optional, Description is used for PipelineVersion if not provided
46+
VersionDescription string
47+
}
48+
49+
type config struct {
50+
// If pipeline version already exists and
51+
// LoadSamplesOnRestart is enabled, then the pipeline
52+
// version is uploaded again on server restart
53+
// if it does not already exist
54+
LoadSamplesOnRestart bool
55+
Pipelines []configPipelines
56+
}
57+
58+
// LoadSamples preloads a collection of pipeline samples
59+
//
60+
// If LoadSamplesOnRestart is false then Samples are only
61+
// loaded once when the pipeline system is initially installed.
62+
// They won't be loaded on upgrade or pod restart, to
63+
// prevent them from reappearing if user explicitly deletes the
64+
// samples. If LoadSamplesOnRestart is true then PipelineVersions
65+
// are uploaded if they do not already exist upon upgrade or pod
66+
// restart.
67+
func LoadSamples(resourceManager *resource.ResourceManager, sampleConfigPath string) error {
68+
pathExists, err := client.PathExists(sampleConfigPath)
69+
if err != nil {
70+
return err
71+
}
72+
73+
if !pathExists {
74+
glog.Infof("No samples path provided, skipping loading samples..")
75+
return nil
76+
}
77+
78+
configBytes, err := os.ReadFile(sampleConfigPath)
79+
if err != nil {
80+
return fmt.Errorf("failed to read sample configurations file. Err: %v", err)
81+
}
82+
83+
var pipelineConfig config
84+
if configErr := json.Unmarshal(configBytes, &pipelineConfig); configErr != nil {
85+
// Attempt to parse to deprecated config version:
86+
var deprecatedCfg []deprecatedConfig
87+
if depConfigErr := json.Unmarshal(configBytes, &deprecatedCfg); depConfigErr != nil {
88+
return fmt.Errorf("failed to read sample configurations. Err: %v", configErr)
89+
}
90+
glog.Warningf("encountered deprecated version of samples config, please update to the newer version to " +
91+
"ensure future compatibility")
92+
for _, cfg := range deprecatedCfg {
93+
pipelineConfig.Pipelines = append(pipelineConfig.Pipelines, configPipelines{
94+
Name: cfg.Name,
95+
File: cfg.File,
96+
Description: cfg.Description,
97+
})
98+
}
99+
pipelineConfig.LoadSamplesOnRestart = false
100+
}
101+
102+
// Check if sample has been loaded already and skip loading if true.
103+
haveSamplesLoaded, err := resourceManager.HaveSamplesLoaded()
104+
if err != nil {
105+
return err
106+
}
107+
108+
if !pipelineConfig.LoadSamplesOnRestart && haveSamplesLoaded {
109+
glog.Infof("Samples already loaded in the past. Skip loading.")
110+
return nil
111+
}
112+
113+
processedPipelines := map[string]bool{}
114+
115+
for _, cfg := range pipelineConfig.Pipelines {
116+
// Track if this is the first upload of this pipeline
117+
reader, configErr := os.Open(cfg.File)
118+
if configErr != nil {
119+
return fmt.Errorf("failed to load sample %s. Error: %v", cfg.Name, configErr)
120+
}
121+
pipelineFile, configErr := server.ReadPipelineFile(cfg.File, reader, common.MaxFileLength)
122+
if configErr != nil {
123+
return fmt.Errorf("failed to load sample %s. Error: %v", cfg.Name, configErr)
124+
}
125+
126+
// Create pipeline if it does not already exist
127+
p, fetchErr := resourceManager.GetPipelineByNameAndNamespace(cfg.Name, "")
128+
if fetchErr != nil {
129+
if util.IsUserErrorCodeMatch(fetchErr, codes.NotFound) {
130+
p, configErr = resourceManager.CreatePipeline(&model.Pipeline{
131+
Name: cfg.Name,
132+
Description: cfg.Description,
133+
})
134+
if configErr != nil {
135+
// Log the error but not fail. The API Server pod can restart and it could potentially cause
136+
// name collision. In the future, we might consider loading samples during deployment, instead
137+
// of when API server starts.
138+
glog.Warningf(fmt.Sprintf(
139+
"Failed to create pipeline for %s. Error: %v", cfg.Name, configErr))
140+
continue
141+
} else {
142+
glog.Info(fmt.Sprintf("Successfully uploaded Pipeline %s.", cfg.Name))
143+
}
144+
} else {
145+
return fmt.Errorf(
146+
"Failed to handle load sample for Pipeline: %s. Error: %v", cfg.Name, fetchErr)
147+
}
148+
}
149+
150+
// Use Pipeline Version Name/Description if provided
151+
// Otherwise fallback to owning Pipeline's Name/Description
152+
pvDescription := cfg.Description
153+
if cfg.VersionDescription != "" {
154+
pvDescription = cfg.VersionDescription
155+
}
156+
pvName := cfg.Name
157+
if cfg.VersionName != "" {
158+
pvName = cfg.VersionName
159+
}
160+
161+
// If the Pipeline Version exists, do nothing
162+
// Otherwise upload new Pipeline Version for
163+
// this pipeline.
164+
_, fetchErr = resourceManager.GetPipelineVersionByName(pvName)
165+
if fetchErr != nil {
166+
if util.IsUserErrorCodeMatch(fetchErr, codes.NotFound) {
167+
_, configErr = resourceManager.CreatePipelineVersion(
168+
&model.PipelineVersion{
169+
Name: pvName,
170+
Description: pvDescription,
171+
PipelineId: p.UUID,
172+
PipelineSpec: string(pipelineFile),
173+
},
174+
)
175+
if configErr != nil {
176+
// Log the error but not fail. The API Server pod can restart and it could potentially cause name collision.
177+
// In the future, we might consider loading samples during deployment, instead of when API server starts.
178+
glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", pvName, configErr))
179+
180+
continue
181+
} else {
182+
glog.Info(fmt.Sprintf("Successfully uploaded PipelineVersion %s.", pvName))
183+
}
184+
185+
if processedPipelines[pvName] {
186+
// Since the default sorting is by create time,
187+
// Sleep one second makes sure the samples are
188+
// showing up in the same order as they are added.
189+
time.Sleep(1 * time.Second)
190+
}
191+
} else {
192+
return fmt.Errorf(
193+
"Failed to handle load sample for PipelineVersion: %s. Error: %v", pvName, fetchErr)
194+
}
195+
} else {
196+
// pipeline version already exists, do nothing
197+
continue
198+
}
199+
200+
processedPipelines[pvName] = true
201+
}
202+
203+
if !haveSamplesLoaded {
204+
err = resourceManager.MarkSampleLoaded()
205+
if err != nil {
206+
return err
207+
}
208+
}
209+
210+
glog.Info("All samples are loaded.")
211+
return nil
212+
}

0 commit comments

Comments
 (0)