Skip to content

Commit 51ab5e6

Browse files
authored
chore: Replace sdk in pipeline files (#12297)
Signed-off-by: Nelesh Singla <[email protected]>
1 parent 348a6f5 commit 51ab5e6

File tree

10 files changed

+101
-47
lines changed

10 files changed

+101
-47
lines changed

.github/actions/test-and-report/action.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,18 @@ runs:
7373
if [ -z $MULTI_USER ]; then
7474
MULTI_USER='false'
7575
fi
76-
go run github.com/onsi/ginkgo/v2/ginkgo -r -v --cover -p --keep-going --github-output=true --nodes=${{ inputs.num_parallel_nodes }} -v --label-filter=${{ inputs.test_label }} -- -namespace=${{ inputs.default_namespace }} -multiUserMode=$MULTI_USER -useProxy=$USE_PROXY -userNamespace=${{ inputs.user_namespace }} -uploadPipelinesWithKubernetes=${{ inputs.upload_pipelines_with_kubernetes_client}}
76+
PULL_NUMBER="${{ github.event.inputs.pull_number || github.event.pull_request.number }}"
77+
REPO_NAME="${{ github.repository }}"
78+
go run github.com/onsi/ginkgo/v2/ginkgo -r -v --cover -p --keep-going --github-output=true --nodes=${{ inputs.num_parallel_nodes }} -v --label-filter=${{ inputs.test_label }} -- -namespace=${{ inputs.default_namespace }} -multiUserMode=$MULTI_USER -useProxy=$USE_PROXY -userNamespace=${{ inputs.user_namespace }} -uploadPipelinesWithKubernetes=${{ inputs.upload_pipelines_with_kubernetes_client}} -pullNumber=$PULL_NUMBER -repoName=$REPO_NAME
7779
continue-on-error: true
7880

7981
- name: Collect Pod logs in case of Test Failures
8082
id: collect-logs
8183
shell: bash
8284
if: ${{ steps.run-tests.outcome != 'success' }}
8385
run: |
86+
echo "=== Current disk usage ==="
87+
df -h
8488
NAMESPACE=${{ env.NAMESPACE }}
8589
if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then
8690
NAMESPACE=${{ inputs.namespace }}

backend/src/common/client/api_server/util.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"crypto/tls"
55
"fmt"
66
"net/http"
7+
"net/url"
78
"os"
8-
"strings"
99
"time"
1010

1111
"github.com/go-openapi/runtime"
@@ -64,9 +64,13 @@ func NewHTTPRuntime(clientConfig clientcmd.ClientConfig, debug bool) (
6464
if !*testconfig.InClusterRun {
6565
httpClient := http.DefaultClient
6666
var scheme []string
67-
if strings.Contains(*testconfig.ApiUrl, "://") {
68-
schemeFromUrl := strings.Replace(strings.Split(*testconfig.ApiUrl, "://")[0], "://", "", -1)
69-
scheme = append(scheme, schemeFromUrl)
67+
parsedUrl, err := url.Parse(*testconfig.ApiUrl)
68+
if err != nil {
69+
return nil, err
70+
}
71+
host := parsedUrl.Host
72+
if parsedUrl.Scheme != "" {
73+
scheme = append(scheme, parsedUrl.Scheme)
7074
}
7175
if testconfig.ApiScheme != nil {
7276
scheme = append(scheme, *testconfig.ApiScheme)
@@ -77,7 +81,8 @@ func NewHTTPRuntime(clientConfig clientcmd.ClientConfig, debug bool) (
7781
}
7882
httpClient = &http.Client{Transport: tr}
7983
}
80-
runtimeClient := httptransport.NewWithClient(*testconfig.ApiUrl, "", scheme, httpClient)
84+
85+
runtimeClient := httptransport.NewWithClient(host, "", scheme, httpClient)
8186
if debug {
8287
runtimeClient.SetDebug(true)
8388
}

backend/test/config/flags.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,16 @@ var (
2929
ApiScheme = flag.String("apiScheme", "http", "The scheme to use for a connection to the api server")
3030
ApiHost = flag.String("apiHost", "localhost", "The hostname of the API server")
3131
ApiPort = flag.String("apiPort", "8888", "The port on which the API server is listening")
32-
ApiUrl = flag.String("apiUrl", fmt.Sprintf("%s:%s", *ApiHost, *ApiPort), "The URL of the API server (without the scheme)")
32+
ApiUrl = flag.String("apiUrl", fmt.Sprintf("%s://%s:%s", *ApiScheme, *ApiHost, *ApiPort), "The URL of the API server")
3333
DisableTlsCheck = flag.Bool("disableTlsCheck", false, "Whether to use server certificate chain and hostname.")
3434
InClusterRun = flag.Bool("runInCluster", false, "Whether to run your tests from within the K8s cluster")
35+
AuthToken = flag.String("authToken", "", "The default auth token that will be injected to all your API request")
36+
)
37+
38+
var (
39+
REPO_NAME = flag.String("repoName", "kubeflow/pipelines", "The name of the repository")
40+
PULL_NUMBER = flag.String("pullNumber", "", "The pull number")
41+
BRANCH_NAME = flag.String("branchName", "master", "The branch name")
3542
)
3643

3744
var DebugMode = flag.Bool("debugMode", false, "Whether to enable debug mode. Debug mode will log more diagnostics messages.")

backend/test/end2end/e2e_suite_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,14 @@ var _ = BeforeSuite(func() {
8080
newRunClient = func() (*apiserver.RunClient, error) {
8181
return apiserver.NewKubeflowInClusterRunClient(*config.Namespace, *config.DebugMode)
8282
}
83-
} else if *config.MultiUserMode {
84-
logger.Log("Creating API Clients for Multi User Mode")
85-
userToken = test_utils.CreateUserToken(k8Client, *config.UserNamespace, *config.UserServiceAccountName)
83+
} else if *config.MultiUserMode || *config.AuthToken != "" {
84+
if *config.AuthToken != "" {
85+
logger.Log("Creating API Clients With Auth Token")
86+
userToken = *config.AuthToken
87+
} else {
88+
logger.Log("Creating API Clients for Multi User Mode")
89+
userToken = test_utils.CreateUserToken(k8Client, *config.UserNamespace, *config.UserServiceAccountName)
90+
}
8691
newPipelineClient = func() (*apiserver.PipelineClient, error) {
8792
return apiserver.NewMultiUserPipelineClient(clientConfig, userToken, *config.DebugMode)
8893
}

backend/test/end2end/pipeline_e2e_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ var _ = Describe("Upload and Verify Pipeline Run >", Label(FULL_REGRESSION), fun
129129
test_utils.CheckIfSkipping(pipelineFile)
130130
pipelineFilePath := filepath.Join(test_utils.GetPipelineFilesDir(), pipelineDir, pipelineFile)
131131
logger.Log("Uploading pipeline file %s", pipelineFile)
132-
uploadedPipeline, uploadErr := e2e_utils.UploadPipeline(pipelineUploadClient, testContext, pipelineDir, pipelineFile, &testContext.Pipeline.PipelineGeneratedName, nil)
132+
uploadedPipeline, uploadErr := test_utils.UploadPipeline(pipelineUploadClient, pipelineFilePath, &testContext.Pipeline.PipelineGeneratedName, nil)
133133
Expect(uploadErr).To(BeNil(), "Failed to upload pipeline %s", pipelineFile)
134134
testContext.Pipeline.CreatedPipelines = append(testContext.Pipeline.CreatedPipelines, uploadedPipeline)
135135
logger.Log("Upload of pipeline file '%s' successful", pipelineFile)
@@ -152,7 +152,7 @@ var _ = Describe("Upload and Verify Pipeline Run >", Label(FULL_REGRESSION), fun
152152
test_utils.CheckIfSkipping(pipelineFile)
153153
pipelineFilePath := filepath.Join(test_utils.GetPipelineFilesDir(), pipelineDir, pipelineFile)
154154
logger.Log("Uploading pipeline file %s", pipelineFile)
155-
uploadedPipeline, uploadErr := e2e_utils.UploadPipeline(pipelineUploadClient, testContext, pipelineDir, pipelineFile, &testContext.Pipeline.PipelineGeneratedName, nil)
155+
uploadedPipeline, uploadErr := test_utils.UploadPipeline(pipelineUploadClient, pipelineFilePath, &testContext.Pipeline.PipelineGeneratedName, nil)
156156
Expect(uploadErr).To(BeNil(), "Failed to upload pipeline %s", pipelineFile)
157157
testContext.Pipeline.CreatedPipelines = append(testContext.Pipeline.CreatedPipelines, uploadedPipeline)
158158
logger.Log("Upload of pipeline file '%s' successful", pipelineFile)
@@ -171,7 +171,8 @@ var _ = Describe("Upload and Verify Pipeline Run >", Label(FULL_REGRESSION), fun
171171
var pipelineDir = "valid"
172172
pipelineFile := "env-var.yaml"
173173
It(fmt.Sprintf("Create a pipeline run with http proxy, using specs: %s", pipelineFile), func() {
174-
uploadedPipeline, uploadErr := e2e_utils.UploadPipeline(pipelineUploadClient, testContext, pipelineDir, pipelineFile, &testContext.Pipeline.PipelineGeneratedName, nil)
174+
pipelineFilePath := filepath.Join(test_utils.GetPipelineFilesDir(), pipelineDir, pipelineFile)
175+
uploadedPipeline, uploadErr := test_utils.UploadPipeline(pipelineUploadClient, pipelineFilePath, &testContext.Pipeline.PipelineGeneratedName, nil)
175176
Expect(uploadErr).To(BeNil(), "Failed to upload pipeline %s", pipelineFile)
176177
testContext.Pipeline.CreatedPipelines = append(testContext.Pipeline.CreatedPipelines, uploadedPipeline)
177178
createdPipelineVersion := test_utils.GetLatestPipelineVersion(pipelineClient, &uploadedPipeline.PipelineID)

backend/test/end2end/utils/e2e_utils.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@ package utils
33
import (
44
"fmt"
55
"maps"
6-
"path/filepath"
76
"sort"
87
"time"
98

10-
model "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_upload_model"
119
runparams "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service"
1210
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model"
1311
apiserver "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2"
@@ -23,20 +21,6 @@ import (
2321
"k8s.io/client-go/kubernetes"
2422
)
2523

26-
// UploadPipeline - Upload a pipeline
27-
func UploadPipeline(pipelineUploadClient apiserver.PipelineUploadInterface, testContext *apitests.TestContext, pipelineDir string, pipelineFileName string, pipelineName *string, pipelineDisplayName *string) (*model.V2beta1Pipeline, error) {
28-
pipelineFile := filepath.Join(test_utils.GetPipelineFilesDir(), pipelineDir, pipelineFileName)
29-
testContext.Pipeline.UploadParams.SetName(pipelineName)
30-
if pipelineDisplayName != nil {
31-
testContext.Pipeline.ExpectedPipeline.DisplayName = *pipelineDisplayName
32-
testContext.Pipeline.UploadParams.SetDisplayName(pipelineDisplayName)
33-
} else {
34-
testContext.Pipeline.ExpectedPipeline.DisplayName = *pipelineName
35-
}
36-
logger.Log("Uploading pipeline with name=%s, from file %s", *pipelineName, pipelineFile)
37-
return pipelineUploadClient.UploadFile(pipelineFile, testContext.Pipeline.UploadParams)
38-
}
39-
4024
// CreatePipelineRun - Create a pipeline run
4125
func CreatePipelineRun(runClient *apiserver.RunClient, testContext *apitests.TestContext, pipelineID *string, pipelineVersionID *string, experimentID *string, inputParams map[string]interface{}) *run_model.V2beta1Run {
4226
runName := fmt.Sprintf("E2e Test Run-%v", testContext.TestStartTimeUTC)

backend/test/test_utils/pipeline_utils.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ package test_utils
1616

1717
import (
1818
"fmt"
19-
2019
pipeline_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_client/pipeline_service"
2120
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_model"
21+
upload_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_upload_client/pipeline_upload_service"
2222
model "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_upload_model"
2323
api_server "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2"
2424
"github.com/kubeflow/pipelines/backend/src/common/util"
2525
"github.com/kubeflow/pipelines/backend/test/logger"
26+
"os"
2627

2728
"github.com/onsi/ginkgo/v2"
2829
"github.com/onsi/gomega"
@@ -58,6 +59,26 @@ func ListPipelines(client *api_server.PipelineClient, namespace *string) []*pipe
5859
return pipelines
5960
}
6061

62+
// UploadPipeline - Upload a pipeline
63+
func UploadPipeline(pipelineUploadClient api_server.PipelineUploadInterface, pipelineFilePath string, pipelineName *string, pipelineDisplayName *string) (*model.V2beta1Pipeline, error) {
64+
uploadParams := upload_params.NewUploadPipelineParams()
65+
uploadParams.SetName(pipelineName)
66+
if pipelineDisplayName != nil {
67+
uploadParams.SetDisplayName(pipelineDisplayName)
68+
}
69+
logger.Log("Creating temp pipeline file with overridden SDK Version")
70+
overriddenPipelineFileWithSDKVersion := ReplaceSDKInPipelineSpec(pipelineFilePath, false, nil)
71+
tempPipelineFile := CreateTempFile(overriddenPipelineFileWithSDKVersion)
72+
defer func() {
73+
// Ensure the temporary file is removed when the function exits
74+
if err := os.Remove(tempPipelineFile.Name()); err != nil {
75+
logger.Log("Error removing temporary file: %s", err)
76+
}
77+
}()
78+
logger.Log("Uploading pipeline with name=%s, from file %s", *pipelineName, pipelineFilePath)
79+
return pipelineUploadClient.UploadFile(tempPipelineFile.Name(), uploadParams)
80+
}
81+
6182
/*
6283
Delete a pipeline by id
6384
*/

backend/test/test_utils/test_utils.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ package test_utils
1616

1717
import (
1818
"fmt"
19+
"github.com/onsi/gomega"
20+
v1 "k8s.io/api/core/v1"
1921
"math/rand"
2022
"os"
2123
"path/filepath"
24+
"regexp"
2225
"strings"
2326
"time"
2427

@@ -81,3 +84,30 @@ func GetNamespace() string {
8184
}
8285
return *config.Namespace
8386
}
87+
88+
// getPackagePath generates the package path based on environment variables
89+
// Equivalent to the Python function get_package_path
90+
func getPackagePath(subdir string) string {
91+
repoName := *config.REPO_NAME
92+
93+
pullNumber := *config.PULL_NUMBER
94+
if pullNumber != "" {
95+
return fmt.Sprintf("git+https://github.com/%s.git@refs/pull/%s/merge#subdirectory=%s", repoName, pullNumber, subdir)
96+
}
97+
return fmt.Sprintf("git+https://github.com/%s.git@%s#subdirectory=%s", repoName, *config.BRANCH_NAME, subdir)
98+
}
99+
100+
func ReplaceSDKInPipelineSpec(pipelineFilePath string, cacheDisabled bool, defaultWorkspace *v1.PersistentVolumeClaimSpec) []byte {
101+
pipelineFileBytes, err := os.ReadFile(pipelineFilePath)
102+
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "failed to read pipeline file: "+pipelineFilePath)
103+
pipelineFileString := string(pipelineFileBytes)
104+
105+
// Define regex pattern to match kfp==[version] (e.g., kfp==2.8.0)
106+
kfpPattern := regexp.MustCompile(`kfp==[0-9]+\.[0-9]+\.[0-9]+`)
107+
108+
// Replace all occurrences with the new package path
109+
newPackagePath := getPackagePath("sdk/python")
110+
modifiedPipelineSpec := kfpPattern.ReplaceAllString(pipelineFileString, newPackagePath)
111+
112+
return []byte(modifiedPipelineSpec)
113+
}

backend/test/v2/api/integration_suite_test.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ var testContext *TestContext
4141
var randomName string
4242
var pipelineFilesRootDir = test_utils.GetPipelineFilesDir()
4343
var experimentID *string
44-
var defaultUserToken string
4544
var userToken string
4645

4746
var (
@@ -88,10 +87,14 @@ var _ = BeforeSuite(func() {
8887
newRecurringRunClient = func() (*apiserver.RecurringRunClient, error) {
8988
return apiserver.NewKubeflowInClusterRecurringRunClient(*config.Namespace, *config.DebugMode)
9089
}
91-
} else if *config.MultiUserMode {
92-
logger.Log("Creating API Clients for Multi User Mode")
93-
defaultUserToken = test_utils.CreateUserToken(k8Client, *config.Namespace, *config.DefaultServiceAccountName)
94-
userToken = test_utils.CreateUserToken(k8Client, *config.UserNamespace, *config.UserServiceAccountName)
90+
} else if *config.MultiUserMode || *config.AuthToken != "" {
91+
if *config.AuthToken != "" {
92+
logger.Log("Creating API Clients With Auth Token")
93+
userToken = *config.AuthToken
94+
} else {
95+
logger.Log("Creating API Clients for Multi User Mode")
96+
userToken = test_utils.CreateUserToken(k8Client, *config.UserNamespace, *config.UserServiceAccountName)
97+
}
9598
newPipelineClient = func() (*apiserver.PipelineClient, error) {
9699
return apiserver.NewMultiUserPipelineClient(clientConfig, userToken, *config.DebugMode)
97100
}
@@ -144,17 +147,11 @@ var _ = BeforeEach(func() {
144147
testContext = &TestContext{
145148
TestStartTimeUTC: time.Now(),
146149
}
147-
148-
experimentName := fmt.Sprintf("APIServerTestsExperiment-%s", strconv.FormatInt(time.Now().UnixNano(), 10))
149-
experiment := test_utils.CreateExperiment(experimentClient, experimentName, test_utils.GetNamespace())
150-
experimentID = &experiment.ExperimentID
151-
152150
randomName = strconv.FormatInt(time.Now().UnixNano(), 10)
153151
testContext.Pipeline.CreatedPipelines = make([]*pipeline_upload_model.V2beta1Pipeline, 0)
154152
testContext.Pipeline.UploadParams = uploadparams.NewUploadPipelineParams()
155153
testContext.PipelineRun.CreatedRunIds = make([]string, 0)
156154
testContext.Experiment.CreatedExperimentIds = make([]string, 0)
157-
testContext.Experiment.CreatedExperimentIds = append(testContext.Experiment.CreatedExperimentIds, *experimentID)
158155
})
159156

160157
var _ = AfterEach(func() {

backend/test/v2/api/pipeline_run_api_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ package api
1616

1717
import (
1818
"fmt"
19-
"path/filepath"
20-
"strings"
21-
2219
experimentparams "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/experiment_client/experiment_service"
2320
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/experiment_model"
2421
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_model"
@@ -30,6 +27,8 @@ import (
3027
"github.com/kubeflow/pipelines/backend/test/logger"
3128
"github.com/kubeflow/pipelines/backend/test/test_utils"
3229
"github.com/kubeflow/pipelines/backend/test/v2/api/matcher"
30+
"path/filepath"
31+
"strings"
3332

3433
. "github.com/onsi/ginkgo/v2"
3534
. "github.com/onsi/gomega"
@@ -70,15 +69,16 @@ var _ = Describe("Verify Pipeline Run >", Label(POSITIVE, API_PIPELINE_RUN, API_
7069
for _, param := range testParams {
7170
for _, pipelineFilePath := range pipelineFilePaths {
7271
It(fmt.Sprintf("Create a '%s' pipeline with cacheEnabled=%t and verify run", pipelineFilePath, param.pipelineCacheEnabled), func() {
72+
createdExperiment := createExperiment(experimentName)
7373
pipelineFilePath := pipelineFilePath
7474
pipelineFileName := filepath.Base(pipelineFilePath)
7575
test_utils.CheckIfSkipping(pipelineFileName)
7676
configuredPipelineSpecFile := configureCacheSettingAndGetPipelineFile(pipelineFilePath, param.pipelineCacheEnabled)
7777
createdPipeline := uploadAPipeline(configuredPipelineSpecFile, &testContext.Pipeline.PipelineGeneratedName)
7878
createdPipelineVersion := test_utils.GetLatestPipelineVersion(pipelineClient, &createdPipeline.PipelineID)
7979
pipelineRuntimeInputs := test_utils.GetPipelineRunTimeInputs(configuredPipelineSpecFile)
80-
createdPipelineRun := createPipelineRun(&createdPipeline.PipelineID, &createdPipelineVersion.PipelineVersionID, experimentID, pipelineRuntimeInputs)
81-
createdExpectedRunAndVerify(createdPipelineRun, &createdPipeline.PipelineID, &createdPipelineVersion.PipelineVersionID, experimentID, pipelineRuntimeInputs)
80+
createdPipelineRun := createPipelineRun(&createdPipeline.PipelineID, &createdPipelineVersion.PipelineVersionID, &createdExperiment.ExperimentID, pipelineRuntimeInputs)
81+
createdExpectedRunAndVerify(createdPipelineRun, &createdPipeline.PipelineID, &createdPipelineVersion.PipelineVersionID, &createdExperiment.ExperimentID, pipelineRuntimeInputs)
8282
})
8383
}
8484
}
@@ -333,7 +333,7 @@ func uploadAPipeline(pipelineFile string, pipelineName *string) *pipeline_upload
333333
logger.Log("Create a pipeline")
334334
testContext.Pipeline.UploadParams.SetName(pipelineName)
335335
logger.Log("Uploading pipeline with name=%s, from file %s", *pipelineName, pipelineFile)
336-
createdPipeline, uploadErr := pipelineUploadClient.UploadFile(pipelineFile, testContext.Pipeline.UploadParams)
336+
createdPipeline, uploadErr := test_utils.UploadPipeline(pipelineUploadClient, pipelineFile, pipelineName, nil)
337337
Expect(uploadErr).NotTo(HaveOccurred(), "Failed to upload pipeline")
338338
testContext.Pipeline.CreatedPipelines = append(testContext.Pipeline.CreatedPipelines, createdPipeline)
339339
return createdPipeline

0 commit comments

Comments
 (0)