|
| 1 | +// Copyright 2025 The Kubeflow Authors |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// https://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package config |
| 16 | + |
| 17 | +import ( |
| 18 | + "encoding/json" |
| 19 | + "fmt" |
| 20 | + "github.com/golang/glog" |
| 21 | + "github.com/kubeflow/pipelines/backend/src/apiserver/client" |
| 22 | + "github.com/kubeflow/pipelines/backend/src/apiserver/common" |
| 23 | + "github.com/kubeflow/pipelines/backend/src/apiserver/model" |
| 24 | + "github.com/kubeflow/pipelines/backend/src/apiserver/resource" |
| 25 | + "github.com/kubeflow/pipelines/backend/src/apiserver/server" |
| 26 | + "github.com/kubeflow/pipelines/backend/src/common/util" |
| 27 | + "google.golang.org/grpc/codes" |
| 28 | + "os" |
| 29 | + "time" |
| 30 | +) |
| 31 | + |
| 32 | +// deprecated |
| 33 | +type deprecatedConfig struct { |
| 34 | + Name string |
| 35 | + Description string |
| 36 | + File string |
| 37 | +} |
| 38 | + |
| 39 | +type configPipelines struct { |
| 40 | + Name string |
| 41 | + Description string |
| 42 | + File string |
| 43 | + // optional, Name is used for PipelineVersion if not provided |
| 44 | + VersionName string |
| 45 | + // optional, Description is used for PipelineVersion if not provided |
| 46 | + VersionDescription string |
| 47 | +} |
| 48 | + |
| 49 | +type config struct { |
| 50 | + // If pipeline version already exists and |
| 51 | + // LoadSamplesOnRestart is enabled, then the pipeline |
| 52 | + // version is uploaded again on server restart |
| 53 | + // if it does not already exist |
| 54 | + LoadSamplesOnRestart bool |
| 55 | + Pipelines []configPipelines |
| 56 | +} |
| 57 | + |
| 58 | +// LoadSamples preloads a collection of pipeline samples |
| 59 | +// |
| 60 | +// If LoadSamplesOnRestart is false then Samples are only |
| 61 | +// loaded once when the pipeline system is initially installed. |
| 62 | +// They won't be loaded on upgrade or pod restart, to |
| 63 | +// prevent them from reappearing if user explicitly deletes the |
| 64 | +// samples. If LoadSamplesOnRestart is true then PipelineVersions |
| 65 | +// are uploaded if they do not already exist upon upgrade or pod |
| 66 | +// restart. |
| 67 | +func LoadSamples(resourceManager *resource.ResourceManager, sampleConfigPath string) error { |
| 68 | + pathExists, err := client.PathExists(sampleConfigPath) |
| 69 | + if err != nil { |
| 70 | + return err |
| 71 | + } |
| 72 | + |
| 73 | + if !pathExists { |
| 74 | + glog.Infof("No samples path provided, skipping loading samples..") |
| 75 | + return nil |
| 76 | + } |
| 77 | + |
| 78 | + configBytes, err := os.ReadFile(sampleConfigPath) |
| 79 | + if err != nil { |
| 80 | + return fmt.Errorf("failed to read sample configurations file. Err: %v", err) |
| 81 | + } |
| 82 | + |
| 83 | + var pipelineConfig config |
| 84 | + if configErr := json.Unmarshal(configBytes, &pipelineConfig); configErr != nil { |
| 85 | + // Attempt to parse to deprecated config version: |
| 86 | + var deprecatedCfg []deprecatedConfig |
| 87 | + if depConfigErr := json.Unmarshal(configBytes, &deprecatedCfg); depConfigErr != nil { |
| 88 | + return fmt.Errorf("failed to read sample configurations. Err: %v", configErr) |
| 89 | + } |
| 90 | + glog.Warningf("encountered deprecated version of samples config, please update to the newer version to " + |
| 91 | + "ensure future compatibility") |
| 92 | + for _, cfg := range deprecatedCfg { |
| 93 | + pipelineConfig.Pipelines = append(pipelineConfig.Pipelines, configPipelines{ |
| 94 | + Name: cfg.Name, |
| 95 | + File: cfg.File, |
| 96 | + Description: cfg.Description, |
| 97 | + }) |
| 98 | + } |
| 99 | + pipelineConfig.LoadSamplesOnRestart = false |
| 100 | + } |
| 101 | + |
| 102 | + // Check if sample has been loaded already and skip loading if true. |
| 103 | + haveSamplesLoaded, err := resourceManager.HaveSamplesLoaded() |
| 104 | + if err != nil { |
| 105 | + return err |
| 106 | + } |
| 107 | + |
| 108 | + if !pipelineConfig.LoadSamplesOnRestart && haveSamplesLoaded { |
| 109 | + glog.Infof("Samples already loaded in the past. Skip loading.") |
| 110 | + return nil |
| 111 | + } |
| 112 | + |
| 113 | + processedPipelines := map[string]bool{} |
| 114 | + |
| 115 | + for _, cfg := range pipelineConfig.Pipelines { |
| 116 | + // Track if this is the first upload of this pipeline |
| 117 | + reader, configErr := os.Open(cfg.File) |
| 118 | + if configErr != nil { |
| 119 | + return fmt.Errorf("failed to load sample %s. Error: %v", cfg.Name, configErr) |
| 120 | + } |
| 121 | + pipelineFile, configErr := server.ReadPipelineFile(cfg.File, reader, common.MaxFileLength) |
| 122 | + if configErr != nil { |
| 123 | + return fmt.Errorf("failed to load sample %s. Error: %v", cfg.Name, configErr) |
| 124 | + } |
| 125 | + |
| 126 | + // Create pipeline if it does not already exist |
| 127 | + p, fetchErr := resourceManager.GetPipelineByNameAndNamespace(cfg.Name, "") |
| 128 | + if fetchErr != nil { |
| 129 | + if util.IsUserErrorCodeMatch(fetchErr, codes.NotFound) { |
| 130 | + p, configErr = resourceManager.CreatePipeline(&model.Pipeline{ |
| 131 | + Name: cfg.Name, |
| 132 | + Description: cfg.Description, |
| 133 | + }) |
| 134 | + if configErr != nil { |
| 135 | + // Log the error but not fail. The API Server pod can restart and it could potentially cause |
| 136 | + // name collision. In the future, we might consider loading samples during deployment, instead |
| 137 | + // of when API server starts. |
| 138 | + glog.Warningf(fmt.Sprintf( |
| 139 | + "Failed to create pipeline for %s. Error: %v", cfg.Name, configErr)) |
| 140 | + continue |
| 141 | + } else { |
| 142 | + glog.Info(fmt.Sprintf("Successfully uploaded Pipeline %s.", cfg.Name)) |
| 143 | + } |
| 144 | + } else { |
| 145 | + return fmt.Errorf( |
| 146 | + "Failed to handle load sample for Pipeline: %s. Error: %v", cfg.Name, fetchErr) |
| 147 | + } |
| 148 | + } |
| 149 | + |
| 150 | + // Use Pipeline Version Name/Description if provided |
| 151 | + // Otherwise fallback to owning Pipeline's Name/Description |
| 152 | + pvDescription := cfg.Description |
| 153 | + if cfg.VersionDescription != "" { |
| 154 | + pvDescription = cfg.VersionDescription |
| 155 | + } |
| 156 | + pvName := cfg.Name |
| 157 | + if cfg.VersionName != "" { |
| 158 | + pvName = cfg.VersionName |
| 159 | + } |
| 160 | + |
| 161 | + // If the Pipeline Version exists, do nothing |
| 162 | + // Otherwise upload new Pipeline Version for |
| 163 | + // this pipeline. |
| 164 | + _, fetchErr = resourceManager.GetPipelineVersionByName(pvName) |
| 165 | + if fetchErr != nil { |
| 166 | + if util.IsUserErrorCodeMatch(fetchErr, codes.NotFound) { |
| 167 | + _, configErr = resourceManager.CreatePipelineVersion( |
| 168 | + &model.PipelineVersion{ |
| 169 | + Name: pvName, |
| 170 | + Description: pvDescription, |
| 171 | + PipelineId: p.UUID, |
| 172 | + PipelineSpec: string(pipelineFile), |
| 173 | + }, |
| 174 | + ) |
| 175 | + if configErr != nil { |
| 176 | + // Log the error but not fail. The API Server pod can restart and it could potentially cause name collision. |
| 177 | + // In the future, we might consider loading samples during deployment, instead of when API server starts. |
| 178 | + glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", pvName, configErr)) |
| 179 | + |
| 180 | + continue |
| 181 | + } else { |
| 182 | + glog.Info(fmt.Sprintf("Successfully uploaded PipelineVersion %s.", pvName)) |
| 183 | + } |
| 184 | + |
| 185 | + if processedPipelines[pvName] { |
| 186 | + // Since the default sorting is by create time, |
| 187 | + // Sleep one second makes sure the samples are |
| 188 | + // showing up in the same order as they are added. |
| 189 | + time.Sleep(1 * time.Second) |
| 190 | + } |
| 191 | + } else { |
| 192 | + return fmt.Errorf( |
| 193 | + "Failed to handle load sample for PipelineVersion: %s. Error: %v", pvName, fetchErr) |
| 194 | + } |
| 195 | + } else { |
| 196 | + // pipeline version already exists, do nothing |
| 197 | + continue |
| 198 | + } |
| 199 | + |
| 200 | + processedPipelines[pvName] = true |
| 201 | + } |
| 202 | + |
| 203 | + if !haveSamplesLoaded { |
| 204 | + err = resourceManager.MarkSampleLoaded() |
| 205 | + if err != nil { |
| 206 | + return err |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + glog.Info("All samples are loaded.") |
| 211 | + return nil |
| 212 | +} |
0 commit comments