diff --git a/tests/integration/godog/features/pipeline/chain.feature b/tests/integration/godog/features/pipeline/chain.feature deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/integration/godog/features/pipeline/conditional.feature b/tests/integration/godog/features/pipeline/conditional.feature new file mode 100644 index 0000000000..eae88cd495 --- /dev/null +++ b/tests/integration/godog/features/pipeline/conditional.feature @@ -0,0 +1,71 @@ +@PipelineDeployment @Functional @Pipelines @Conditional +Feature: Conditional pipeline with branching models + This pipeline uses a conditional model to route data to either add10 or mul10. + + Scenario: Deploy tfsimple-conditional pipeline and wait for readiness + Given I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: conditional-nbsl + spec: + storageUri: "gs://seldon-models/scv2/samples/triton_23-03/conditional" + requirements: + - triton + - python + """ + And I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: add10-nbsl + spec: + storageUri: "gs://seldon-models/scv2/samples/triton_23-03/add10" + requirements: + - triton + - python + """ + And I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: mul10-nbsl + spec: + storageUri: "gs://seldon-models/scv2/samples/triton_23-03/mul10" + requirements: + - triton + - python + """ + Then the model "conditional-nbsl" should eventually become Ready with timeout "20s" + And the model "add10-nbsl" should eventually become Ready with timeout "20s" + And the model "mul10-nbsl" should eventually become Ready with timeout "20s" + + And I deploy pipeline spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Pipeline + metadata: + name: tfsimple-conditional-nbsl + spec: + steps: + - name: conditional-nbsl + - name: mul10-nbsl + inputs: + - conditional-nbsl.outputs.OUTPUT0 + tensorMap: + conditional-nbsl.outputs.OUTPUT0: INPUT + - name: add10-nbsl + inputs: + - conditional-nbsl.outputs.OUTPUT1 + tensorMap: + conditional-nbsl.outputs.OUTPUT1: INPUT + output: + steps: + - mul10-nbsl + - add10-nbsl + stepsJoin: any + """ + Then the pipeline "tfsimple-conditional-nbsl" should eventually become Ready with timeout "40s" diff --git a/tests/integration/godog/features/pipeline/deployment.feature b/tests/integration/godog/features/pipeline/deployment.feature deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/integration/godog/features/pipeline/input_chaining.feature b/tests/integration/godog/features/pipeline/input_chaining.feature new file mode 100644 index 0000000000..90ff394d40 --- /dev/null +++ b/tests/integration/godog/features/pipeline/input_chaining.feature @@ -0,0 +1,53 @@ +@PipelineDeployment @Functional @Pipelines @ModelChainingFromInputs +Feature: Pipeline model chaining using inputs and outputs + This pipeline chains tfsimple1 into tfsimple2 using both inputs and outputs. + + Scenario: Deploy tfsimples-input pipeline and wait for readiness + Given I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: chain-from-input-tfsimple1-yhjo + spec: + storageUri: "gs://seldon-models/triton/simple" + requirements: + - tensorflow + memory: 100Ki + + """ + And I deploy model spec with timeout "20s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: chain-from-input-tfsimple2-yhjo + spec: + storageUri: "gs://seldon-models/triton/simple" + requirements: + - tensorflow + memory: 100Ki + """ + Then the model "chain-from-input-tfsimple1-yhjo" should eventually become Ready with timeout "20s" + Then the model "chain-from-input-tfsimple2-yhjo" should eventually become Ready with timeout "20s" + + And I deploy pipeline spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Pipeline + metadata: + name: chain-from-input-tfsimples-input-yhjo + spec: + steps: + - name: chain-from-input-tfsimple1-yhjo + - name: chain-from-input-tfsimple2-yhjo + inputs: + - chain-from-input-tfsimple1-yhjo.inputs.INPUT0 + - chain-from-input-tfsimple1-yhjo.outputs.OUTPUT1 + tensorMap: + chain-from-input-tfsimple1-yhjo.outputs.OUTPUT1: INPUT1 + output: + steps: + - chain-from-input-tfsimple2-yhjo + """ + Then the pipeline "chain-from-input-tfsimples-input-yhjo" should eventually become Ready with timeout "40s" diff --git a/tests/integration/godog/features/pipeline/input_tensors.feature b/tests/integration/godog/features/pipeline/input_tensors.feature new file mode 100644 index 0000000000..aa7778e927 --- /dev/null +++ b/tests/integration/godog/features/pipeline/input_tensors.feature @@ -0,0 +1,56 @@ +@PipelineDeployment @Functional @Pipelines @PipelineInputTensors +Feature: Pipeline using direct input tensors + This pipeline directly routes pipeline input tensors INPUT0 and INPUT1 into separate models. + + Scenario: Deploy pipeline-inputs pipeline and wait for readiness + Given I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: mul10-tw2x + spec: + storageUri: "gs://seldon-models/scv2/samples/triton_23-03/mul10" + requirements: + - triton + - python + """ + And I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: add10-tw2x + spec: + storageUri: "gs://seldon-models/scv2/samples/triton_23-03/add10" + requirements: + - triton + - python + """ + Then the model "mul10-tw2x" should eventually become Ready with timeout "20s" + And the model "add10-tw2x" should eventually become Ready with timeout "20s" + + And I deploy pipeline spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Pipeline + metadata: + name: pipeline-inputs-tw2x + spec: + steps: + - name: mul10-tw2x + inputs: + - pipeline-inputs-tw2x.inputs.INPUT0 + tensorMap: + pipeline-inputs-tw2x.inputs.INPUT0: INPUT + - name: add10-tw2x + inputs: + - pipeline-inputs-tw2x.inputs.INPUT1 + tensorMap: + pipeline-inputs-tw2x.inputs.INPUT1: INPUT + output: + steps: + - mul10-tw2x + - add10-tw2x + """ + Then the pipeline "pipeline-inputs-tw2x" should eventually become Ready with timeout "20s" diff --git a/tests/integration/godog/features/pipeline/join.feature b/tests/integration/godog/features/pipeline/join.feature index e69de29bb2..b5eda176e1 100644 --- a/tests/integration/godog/features/pipeline/join.feature +++ b/tests/integration/godog/features/pipeline/join.feature @@ -0,0 +1,67 @@ +@PipelineDeployment @Functional @Pipelines @ModelJoin +Feature: Pipeline model join + This pipeline joins outputs from tfsimple1 and tfsimple2 and feeds them into tfsimple3. + + Scenario: Deploy tfsimples-join pipeline and wait for readiness + Given I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: join-tfsimple1-w4e3 + spec: + storageUri: "gs://seldon-models/triton/simple" + requirements: + - tensorflow + memory: 100Ki + """ + And I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: join-tfsimple2-w4e3 + spec: + storageUri: "gs://seldon-models/triton/simple" + requirements: + - tensorflow + memory: 100Ki + """ + And I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: join-tfsimple3-w4e3 + spec: + storageUri: "gs://seldon-models/triton/simple" + requirements: + - tensorflow + memory: 100Ki + """ + Then the model "join-tfsimple1-w4e3" should eventually become Ready with timeout "20s" + And the model "join-tfsimple2-w4e3" should eventually become Ready with timeout "20s" + And the model "join-tfsimple3-w4e3" should eventually become Ready with timeout "20s" + + And I deploy pipeline spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Pipeline + metadata: + name: join-pipeline-w4e3 + spec: + steps: + - name: join-tfsimple1-w4e3 + - name: join-tfsimple2-w4e3 + - name: join-tfsimple3-w4e3 + inputs: + - join-tfsimple1-w4e3.outputs.OUTPUT0 + - join-tfsimple2-w4e3.outputs.OUTPUT1 + tensorMap: + join-tfsimple1-w4e3.outputs.OUTPUT0: INPUT0 + join-tfsimple2-w4e3.outputs.OUTPUT1: INPUT1 + output: + steps: + - join-tfsimple3-w4e3 + """ + Then the pipeline "join-pipeline-w4e3" should eventually become Ready with timeout "40s" diff --git a/tests/integration/godog/features/pipeline/model_chaining.feature b/tests/integration/godog/features/pipeline/model_chaining.feature new file mode 100644 index 0000000000..5af153759e --- /dev/null +++ b/tests/integration/godog/features/pipeline/model_chaining.feature @@ -0,0 +1,52 @@ +@ModelChaining @Functional @Pipelines +Feature: Pipeline model chaining + This pipeline chains tfsimple1 into tfsimple2 using tensorMap. + + Scenario: Deploy tfsimples pipeline and wait for readiness + Given I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: model-chain-tfsimple1-iuw3 + spec: + storageUri: "gs://seldon-models/triton/simple" + requirements: + - tensorflow + memory: 100Ki + + """ + And I deploy model spec with timeout "20s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: model-chain-tfsimple2-iuw3 + spec: + storageUri: "gs://seldon-models/triton/simple" + requirements: + - tensorflow + memory: 100Ki + """ + Then the model "model-chain-tfsimple1-iuw3" should eventually become Ready with timeout "20s" + Then the model "model-chain-tfsimple2-iuw3" should eventually become Ready with timeout "20s" + When I deploy pipeline spec with timeout "20s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Pipeline + metadata: + name: model-chain-tfsimples-iuw3 + spec: + steps: + - name: model-chain-tfsimple1-iuw3 + - name: model-chain-tfsimple2-iuw3 + inputs: + - model-chain-tfsimple1-iuw3 + tensorMap: + model-chain-tfsimple1-iuw3.outputs.OUTPUT0: INPUT0 + model-chain-tfsimple1-iuw3.outputs.OUTPUT1: INPUT1 + output: + steps: + - model-chain-tfsimple2-iuw3 + """ + Then the pipeline "model-chain-tfsimples-iuw3" should eventually become Ready with timeout "40s" diff --git a/tests/integration/godog/features/pipeline/trigger_joins.feature b/tests/integration/godog/features/pipeline/trigger_joins.feature new file mode 100644 index 0000000000..89de371066 --- /dev/null +++ b/tests/integration/godog/features/pipeline/trigger_joins.feature @@ -0,0 +1,59 @@ +@PipelineDeployment @Functional @Pipelines @TriggerJoins +Feature: Pipeline using trigger joins + This pipeline uses trigger joins to decide whether mul10 or add10 should run. + + Scenario: Deploy trigger-joins pipeline and wait for readiness + Given I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: mul10-99lo + spec: + storageUri: "gs://seldon-models/scv2/samples/triton_23-03/mul10" + requirements: + - triton + - python + """ + And I deploy model spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: add10-99lo + spec: + storageUri: "gs://seldon-models/scv2/samples/triton_23-03/add10" + requirements: + - triton + - python + """ + Then the model "mul10-99lo" should eventually become Ready with timeout "20s" + And the model "add10-99lo" should eventually become Ready with timeout "20s" + + And I deploy pipeline spec with timeout "30s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Pipeline + metadata: + name: trigger-joins-99lo + spec: + steps: + - name: mul10-99lo + inputs: + - trigger-joins-99lo.inputs.INPUT + triggers: + - trigger-joins-99lo.inputs.ok1 + - trigger-joins-99lo.inputs.ok2 + triggersJoinType: any + - name: add10-99lo + inputs: + - trigger-joins-99lo.inputs.INPUT + triggers: + - trigger-joins-99lo.inputs.ok3 + output: + steps: + - mul10-99lo + - add10-99lo + stepsJoin: any + """ + Then the pipeline "trigger-joins-99lo" should eventually become Ready with timeout "20s" diff --git a/tests/integration/godog/go.mod b/tests/integration/godog/go.mod index e7b58dc41c..4d3bf828ef 100644 --- a/tests/integration/godog/go.mod +++ b/tests/integration/godog/go.mod @@ -9,11 +9,11 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/pflag v1.0.7 google.golang.org/grpc v1.73.0 - gopkg.in/yaml.v3 v3.0.1 - k8s.io/api v0.34.2 - k8s.io/apimachinery v0.34.2 - k8s.io/client-go v0.34.2 + k8s.io/apiextensions-apiserver v0.34.3 + k8s.io/apimachinery v0.34.3 + k8s.io/client-go v0.34.3 sigs.k8s.io/controller-runtime v0.22.4 + sigs.k8s.io/yaml v1.6.0 ) replace ( @@ -71,7 +71,8 @@ require ( google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - k8s.io/apiextensions-apiserver v0.34.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/api v0.34.3 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect @@ -79,5 +80,4 @@ require ( sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect - sigs.k8s.io/yaml v1.6.0 // indirect ) diff --git a/tests/integration/godog/go.sum b/tests/integration/godog/go.sum index 726fc059ac..fe6d06b165 100644 --- a/tests/integration/godog/go.sum +++ b/tests/integration/godog/go.sum @@ -234,14 +234,14 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.34.2 h1:fsSUNZhV+bnL6Aqrp6O7lMTy6o5x2C4XLjnh//8SLYY= -k8s.io/api v0.34.2/go.mod h1:MMBPaWlED2a8w4RSeanD76f7opUoypY8TFYkSM+3XHw= -k8s.io/apiextensions-apiserver v0.34.1 h1:NNPBva8FNAPt1iSVwIE0FsdrVriRXMsaWFMqJbII2CI= -k8s.io/apiextensions-apiserver v0.34.1/go.mod h1:hP9Rld3zF5Ay2Of3BeEpLAToP+l4s5UlxiHfqRaRcMc= -k8s.io/apimachinery v0.34.2 h1:zQ12Uk3eMHPxrsbUJgNF8bTauTVR2WgqJsTmwTE/NW4= -k8s.io/apimachinery v0.34.2/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/client-go v0.34.2 h1:Co6XiknN+uUZqiddlfAjT68184/37PS4QAzYvQvDR8M= -k8s.io/client-go v0.34.2/go.mod h1:2VYDl1XXJsdcAxw7BenFslRQX28Dxz91U9MWKjX97fE= +k8s.io/api v0.34.3 h1:D12sTP257/jSH2vHV2EDYrb16bS7ULlHpdNdNhEw2S4= +k8s.io/api v0.34.3/go.mod h1:PyVQBF886Q5RSQZOim7DybQjAbVs8g7gwJNhGtY5MBk= +k8s.io/apiextensions-apiserver v0.34.3 h1:p10fGlkDY09eWKOTeUSioxwLukJnm+KuDZdrW71y40g= +k8s.io/apiextensions-apiserver v0.34.3/go.mod h1:aujxvqGFRdb/cmXYfcRTeppN7S2XV/t7WMEc64zB5A0= +k8s.io/apimachinery v0.34.3 h1:/TB+SFEiQvN9HPldtlWOTp0hWbJ+fjU+wkxysf/aQnE= +k8s.io/apimachinery v0.34.3/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/client-go v0.34.3 h1:wtYtpzy/OPNYf7WyNBTj3iUA0XaBHVqhv4Iv3tbrF5A= +k8s.io/client-go v0.34.3/go.mod h1:OxxeYagaP9Kdf78UrKLa3YZixMCfP6bgPwPwNBQBzpM= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= diff --git a/tests/integration/godog/k8sclient/client.go b/tests/integration/godog/k8sclient/client.go index 29ddf9eba5..376cbf4c77 100644 --- a/tests/integration/godog/k8sclient/client.go +++ b/tests/integration/godog/k8sclient/client.go @@ -12,6 +12,7 @@ package k8sclient import ( "context" "fmt" + "time" mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" log "github.com/sirupsen/logrus" @@ -28,12 +29,12 @@ type K8sClient struct { KubeClient client.WithWatch } -var DefaultCRDLabelMap = map[string]string{ +var DefaultCRDTestSuiteLabelMap = map[string]string{ "test-suite": "godog", } const ( - DefaultCRDLabel = "test-suite=godog" + DefaultCRDTestSuiteLabel = "test-suite=godog" ) // New todo: separate k8s client init and pass to new @@ -83,7 +84,7 @@ func (k8s *K8sClient) ApplyModel(model *mlopsv1alpha1.Model) error { } // add labels - for k, v := range DefaultCRDLabelMap { + for k, v := range DefaultCRDTestSuiteLabelMap { model.Labels[k] = v } @@ -116,8 +117,55 @@ func (k8s *K8sClient) DeleteScenarioResources(ctx context.Context, labels client client.InNamespace(k8s.namespace), labels, ); err != nil { - return err + return fmt.Errorf("failed to delete Models: %w", err) } - return nil + if err := k8s.KubeClient.DeleteAllOf( + ctx, + &mlopsv1alpha1.Pipeline{}, + client.InNamespace(k8s.namespace), + labels, + ); err != nil { + return fmt.Errorf("failed to delete Pipelines: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for Models/Pipelines to be deleted: %w", ctx.Err()) + + case <-ticker.C: + // Check Models + var modelList mlopsv1alpha1.ModelList + if err := k8s.KubeClient.List( + ctx, + &modelList, + client.InNamespace(k8s.namespace), + labels, + ); err != nil { + return fmt.Errorf("failed to list Models: %w", err) + } + + // Check Pipelines + var pipelineList mlopsv1alpha1.PipelineList + if err := k8s.KubeClient.List( + ctx, + &pipelineList, + client.InNamespace(k8s.namespace), + labels, + ); err != nil { + return fmt.Errorf("failed to list Pipelines: %w", err) + } + + if len(modelList.Items) == 0 && len(pipelineList.Items) == 0 { + return nil + } + } + } } diff --git a/tests/integration/godog/k8sclient/watcher_store.go b/tests/integration/godog/k8sclient/watcher_store.go index 32f12531d8..acdf1dc6b8 100644 --- a/tests/integration/godog/k8sclient/watcher_store.go +++ b/tests/integration/godog/k8sclient/watcher_store.go @@ -14,8 +14,10 @@ import ( "fmt" "sync" + mlopsscheme "github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned/scheme" "github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned/typed/mlops/v1alpha1" log "github.com/sirupsen/logrus" + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme" "k8s.io/apimachinery/pkg/api/meta" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -23,20 +25,29 @@ import ( ) type WatcherStorage interface { - Put(runtime.Object) - Get(runtime.Object) (runtime.Object, bool) - WaitFor(ctx context.Context, obj runtime.Object, cond ConditionFunc) error + WaitForObjectCondition(ctx context.Context, obj runtime.Object, cond ConditionFunc) error + WaitForModelCondition(ctx context.Context, modelName string, cond ConditionFunc) error + WaitForPipelineCondition(ctx context.Context, modelName string, cond ConditionFunc) error Clear() Start() Stop() } +type objectKind string + +const ( + model objectKind = "Model" + pipeline objectKind = "Pipeline" +) + type WatcherStore struct { - namespace string - label string - mlopsClient v1alpha1.MlopsV1alpha1Interface - modelWatcher watch.Interface - logger log.FieldLogger + namespace string + label string + mlopsClient v1alpha1.MlopsV1alpha1Interface + modelWatcher watch.Interface + pipelineWatcher watch.Interface + logger log.FieldLogger + scheme *runtime.Scheme mu sync.RWMutex store map[string]runtime.Object // key: "namespace/name" @@ -59,19 +70,31 @@ func NewWatcherStore(namespace string, label string, mlopsClient v1alpha1.MlopsV logger = log.New() } - modelWatcher, err := mlopsClient.Models(namespace).Watch(context.Background(), v1.ListOptions{LabelSelector: "test-suite=godog"}) + modelWatcher, err := mlopsClient.Models(namespace).Watch(context.Background(), v1.ListOptions{LabelSelector: DefaultCRDTestSuiteLabel}) if err != nil { return nil, fmt.Errorf("failed to create model watcher: %w", err) } + pipelineWatcher, err := mlopsClient.Pipelines(namespace).Watch(context.Background(), v1.ListOptions{LabelSelector: DefaultCRDTestSuiteLabel}) + if err != nil { + return nil, fmt.Errorf("failed to create pipeline watcher: %w", err) + } + + // Base scheme + register your CRDs + s := runtime.NewScheme() + _ = scheme.AddToScheme(s) // core k8s types (optional but fine) + _ = mlopsscheme.AddToScheme(s) // <-- this is the key line for your CRDs + return &WatcherStore{ - namespace: namespace, - label: label, - mlopsClient: mlopsClient, - modelWatcher: modelWatcher, - logger: logger.WithField("client", "watcher_store"), - store: make(map[string]runtime.Object), - doneChan: make(chan struct{}), + namespace: namespace, + label: label, + mlopsClient: mlopsClient, + modelWatcher: modelWatcher, + pipelineWatcher: pipelineWatcher, + logger: logger.WithField("client", "watcher_store"), + store: make(map[string]runtime.Object), + doneChan: make(chan struct{}), + scheme: s, }, nil } @@ -99,7 +122,43 @@ func (s *WatcherStore) Start() { switch event.Type { case watch.Added, watch.Modified: - s.Put(event.Object) + s.put(event.Object) + case watch.Deleted: + s.delete(event.Object) + case watch.Error: + fmt.Printf("model watch error: %v\n", event.Object) + } + + case <-s.doneChan: + // Stop underlying watcher and exit + s.modelWatcher.Stop() + return + } + } + }() + go func() { + for { + select { + case event, ok := <-s.pipelineWatcher.ResultChan(): + if !ok { + // channel closed: watcher terminated + return + } + + accessor, err := meta.Accessor(event.Object) + if err != nil { + s.logger.WithError(err).Error("failed to access pipeline watcher") + } else { + s.logger.WithField("event", event).Tracef("new pipeline watch event with name: %s on namespace: %s", accessor.GetName(), accessor.GetNamespace()) + } + + if event.Object == nil { + continue + } + + switch event.Type { + case watch.Added, watch.Modified: + s.put(event.Object) case watch.Deleted: s.delete(event.Object) case watch.Error: @@ -133,14 +192,24 @@ func (s *WatcherStore) keyFor(obj runtime.Object) (string, error) { ns := accessor.GetNamespace() if ns == "" { - // fall back to store namespace if the object is cluster-scoped or unset - ns = s.namespace + ns = s.namespace // or "_cluster" if you prefer + } + + // Prefer scheme-based GVK for typed objects + gvks, _, err := s.scheme.ObjectKinds(obj) + if err != nil || len(gvks) == 0 { + // fallback: TypeMeta if present + if ta, taErr := meta.TypeAccessor(obj); taErr == nil && ta.GetKind() != "" { + return fmt.Sprintf("%s/%s/%s", ns, ta.GetKind(), accessor.GetName()), nil + } + return "", fmt.Errorf("failed to determine kind for %T: %w", obj, err) } - return fmt.Sprintf("%s/%s", ns, accessor.GetName()), nil + kind := gvks[0].Kind + return fmt.Sprintf("%s/%s/%s", ns, kind, accessor.GetName()), nil } -func (s *WatcherStore) Put(obj runtime.Object) { +func (s *WatcherStore) put(obj runtime.Object) { if obj == nil { return } @@ -157,7 +226,7 @@ func (s *WatcherStore) Put(obj runtime.Object) { s.notifyWaiters(key, obj) } -func (s *WatcherStore) Get(obj runtime.Object) (runtime.Object, bool) { +func (s *WatcherStore) get(obj runtime.Object) (runtime.Object, bool) { if obj == nil { return nil, false } @@ -198,7 +267,7 @@ func (s *WatcherStore) delete(obj runtime.Object) { s.notifyWaiters(key, nil) } -func (s *WatcherStore) WaitFor(ctx context.Context, obj runtime.Object, cond ConditionFunc) error { +func (s *WatcherStore) WaitForObjectCondition(ctx context.Context, obj runtime.Object, cond ConditionFunc) error { key, err := s.keyFor(obj) if err != nil { return err @@ -239,6 +308,53 @@ func (s *WatcherStore) WaitFor(ctx context.Context, obj runtime.Object, cond Con return err } } +func (s *WatcherStore) WaitForModelCondition(ctx context.Context, modelName string, cond ConditionFunc) error { + key := fmt.Sprintf("%s/%s/%s", s.namespace, model, modelName) + return s.waitForKey(ctx, key, cond) +} + +func (s *WatcherStore) WaitForPipelineCondition(ctx context.Context, pipelineName string, cond ConditionFunc) error { + key := fmt.Sprintf("%s/%s/%s", s.namespace, pipeline, pipelineName) + return s.waitForKey(ctx, key, cond) +} + +func (s *WatcherStore) waitForKey(ctx context.Context, key string, cond ConditionFunc) error { + + // Fast path: check current state + s.mu.RLock() + existing, ok := s.store[key] + s.mu.RUnlock() + + if ok { + done, err := cond(existing) + if err != nil { + return err + } + if done { + return nil + } + } + + // Slow path: register a waiter + w := &waiter{ + key: key, + cond: cond, + result: make(chan error, 1), // buffered so we don't block notifier + } + + s.mu.Lock() + s.waiters = append(s.waiters, w) + s.mu.Unlock() + + // Wait for either condition satisfied or context cancelled + select { + case <-ctx.Done(): + s.removeWaiter(w) + return ctx.Err() + case err := <-w.result: + return err + } +} func (s *WatcherStore) removeWaiter(target *waiter) { s.mu.Lock() diff --git a/tests/integration/godog/steps/assertions/model.go b/tests/integration/godog/steps/assertions/model.go index eafc5b2a02..d2be8b0af3 100644 --- a/tests/integration/godog/steps/assertions/model.go +++ b/tests/integration/godog/steps/assertions/model.go @@ -12,8 +12,7 @@ package assertions import ( "fmt" - mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" - corev1 "k8s.io/api/core/v1" + "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" "k8s.io/apimachinery/pkg/runtime" ) @@ -22,15 +21,13 @@ func ModelReady(obj runtime.Object) (bool, error) { return false, nil } - model, ok := obj.(*mlopsv1alpha1.Model) + model, ok := obj.(*v1alpha1.Model) if !ok { - return false, fmt.Errorf("unexpected type %T, expected *mlopsv1alpha1.Model", obj) + return false, fmt.Errorf("unexpected type %T, expected *v1alpha1.Model", obj) } - for _, c := range model.Status.Conditions { - if c.Type == "Ready" && c.Status == corev1.ConditionTrue { - return true, nil - } + if model.Status.IsReady() { + return true, nil } return false, nil diff --git a/tests/integration/godog/steps/assertions/pipeline.go b/tests/integration/godog/steps/assertions/pipeline.go new file mode 100644 index 0000000000..77003b1926 --- /dev/null +++ b/tests/integration/godog/steps/assertions/pipeline.go @@ -0,0 +1,34 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed BY +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package assertions + +import ( + "fmt" + + "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" +) + +func PipelineReady(obj runtime.Object) (bool, error) { + if obj == nil { + return false, nil + } + + pipeline, ok := obj.(*v1alpha1.Pipeline) + if !ok { + return false, fmt.Errorf("unexpected type %T, expected *v1alpha1.Pipeline", obj) + } + + if pipeline.Status.IsReady() { + return true, nil + } + + return false, nil +} diff --git a/tests/integration/godog/steps/custom_model_steps.go b/tests/integration/godog/steps/custom_model_steps.go index e1228382ec..e3f9398d3a 100644 --- a/tests/integration/godog/steps/custom_model_steps.go +++ b/tests/integration/godog/steps/custom_model_steps.go @@ -14,7 +14,7 @@ import ( "errors" "fmt" - "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" + "github.com/seldonio/seldon-core/tests/integration/godog/steps/assertions" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" @@ -67,51 +67,9 @@ func (m *Model) deleteModel(ctx context.Context, model string) error { } } -func (m *Model) waitForModelReady(ctx context.Context, model string) error { - foundModel, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Get(ctx, model, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed getting model: %w", err) - } - - if foundModel.Status.IsReady() { - return nil - } - - watcher, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Watch(ctx, metav1.ListOptions{ - FieldSelector: fmt.Sprintf("metadata.name=%s", model), - ResourceVersion: foundModel.ResourceVersion, - Watch: true, - }) - if err != nil { - return fmt.Errorf("failed subscribed to watch model: %w", err) - } - defer watcher.Stop() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case event, ok := <-watcher.ResultChan(): - if !ok { - return fmt.Errorf("watch channel closed") - } - - if event.Type == watch.Error { - return fmt.Errorf("watch error: %v", event.Object) - } - - if event.Type == watch.Added || event.Type == watch.Modified { - model := event.Object.(*v1alpha1.Model) - if model.Status.IsReady() { - return nil - } - m.log.Debugf("got watch event: model %s is not ready, still waiting", model) - continue - } - - if event.Type == watch.Deleted { - return fmt.Errorf("resource was deleted") - } - } - } +func (m *Model) waitForModelNameReady(ctx context.Context, name string) error { + return m.watcherStorage.WaitForModelCondition( + ctx, + name, + assertions.ModelReady) } diff --git a/tests/integration/godog/steps/model_steps.go b/tests/integration/godog/steps/model_steps.go index caf86c02c9..90e577dd6f 100644 --- a/tests/integration/godog/steps/model_steps.go +++ b/tests/integration/godog/steps/model_steps.go @@ -110,7 +110,7 @@ func LoadTemplateModelSteps(scenario *godog.ScenarioContext, w *World) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - return w.currentModel.ModelReady(ctx) + return w.currentModel.waitForModelReady(ctx) }) scenario.Step(`^the model status message eventually should be "([^"]+)"$`, w.currentModel.AssertModelStatus) } @@ -123,7 +123,7 @@ func LoadCustomModelSteps(scenario *godog.ScenarioContext, w *World) { }) scenario.Step(`^the model "([^"]+)" should eventually become Ready with timeout "([^"]+)"$`, func(model, timeout string) error { return withTimeoutCtx(timeout, func(ctx context.Context) error { - return w.currentModel.waitForModelReady(ctx, model) + return w.currentModel.waitForModelNameReady(ctx, model) }) }) scenario.Step(`^delete the model "([^"]+)" with timeout "([^"]+)"$`, func(model, timeout string) error { @@ -155,7 +155,7 @@ func (m *Model) applyScenarioLabel() { maps.Copy(m.model.Labels, m.label) // todo: change this approach - for k, v := range k8sclient.DefaultCRDLabelMap { + for k, v := range k8sclient.DefaultCRDTestSuiteLabelMap { m.model.Labels[k] = v } } @@ -238,8 +238,8 @@ func (m *Model) ApplyModel(k *k8sclient.K8sClient) error { return nil } -func (m *Model) ModelReady(ctx context.Context) error { - return m.watcherStorage.WaitFor( +func (m *Model) waitForModelReady(ctx context.Context) error { + return m.watcherStorage.WaitForObjectCondition( ctx, m.model, // the k8s object being watched assertions.ModelReady, // predicate from steps/assertions diff --git a/tests/integration/godog/steps/pipeline_steps.go b/tests/integration/godog/steps/pipeline_steps.go new file mode 100644 index 0000000000..12e2c051ed --- /dev/null +++ b/tests/integration/godog/steps/pipeline_steps.go @@ -0,0 +1,103 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed BY +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package steps + +import ( + "context" + "fmt" + "maps" + + "github.com/cucumber/godog" + mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" + "github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned" + "github.com/seldonio/seldon-core/tests/integration/godog/k8sclient" + "github.com/seldonio/seldon-core/tests/integration/godog/steps/assertions" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" +) + +type Pipeline struct { + label map[string]string + namespace string + pipeline *mlopsv1alpha1.Pipeline + pipelineName string + k8sClient versioned.Interface + watcherStorage k8sclient.WatcherStorage + log logrus.FieldLogger +} + +func NewPipeline(label map[string]string, namespace string, k8sClient versioned.Interface, log logrus.FieldLogger, watcherStorage k8sclient.WatcherStorage) *Pipeline { + return &Pipeline{label: label, pipeline: &mlopsv1alpha1.Pipeline{}, log: log, namespace: namespace, k8sClient: k8sClient, watcherStorage: watcherStorage} +} + +func LoadCustomPipelineSteps(scenario *godog.ScenarioContext, w *World) { + scenario.Step(`^I deploy pipeline spec with timeout "([^"]+)":$`, func(timeout string, spec *godog.DocString) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.currentPipeline.deployPipelineSpec(ctx, spec) + }) + }) + + scenario.Step(`^the pipeline "([^"]+)" (?:should )?eventually become Ready with timeout "([^"]+)"$`, func(name, timeout string) error { + ctx, cancel, err := timeoutToContext(timeout) + if err != nil { + return err + } + defer cancel() + + return w.currentPipeline.waitForPipelineNameReady(ctx, name) + }) +} + +func (p *Pipeline) deployPipelineSpec(ctx context.Context, spec *godog.DocString) error { + pipelineSpec := &mlopsv1alpha1.Pipeline{} + if err := yaml.Unmarshal([]byte(spec.Content), &pipelineSpec); err != nil { + return fmt.Errorf("failed unmarshalling pipeline spec: %w", err) + } + + p.pipeline = pipelineSpec + p.pipeline.Namespace = p.namespace + p.pipelineName = p.pipeline.Name + p.applyScenarioLabel() + + if _, err := p.k8sClient.MlopsV1alpha1().Pipelines(p.namespace).Create(ctx, p.pipeline, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed creating pipeline: %w", err) + } + return nil +} + +func (p *Pipeline) applyScenarioLabel() { + if p.pipeline.Labels == nil { + p.pipeline.Labels = make(map[string]string) + } + + maps.Copy(p.pipeline.Labels, p.label) + + // todo: change this approach + for k, v := range k8sclient.DefaultCRDTestSuiteLabelMap { + p.pipeline.Labels[k] = v + } +} + +func (p *Pipeline) waitForPipelineNameReady(ctx context.Context, name string) error { + return p.watcherStorage.WaitForPipelineCondition( + ctx, + name, + assertions.PipelineReady, + ) +} + +func (p *Pipeline) waitForPipelineReady(ctx context.Context) error { + return p.watcherStorage.WaitForObjectCondition( + ctx, + p.pipeline, + assertions.PipelineReady, + ) +} diff --git a/tests/integration/godog/steps/server_steps.go b/tests/integration/godog/steps/server_steps.go index 8f605fb106..23c24ac043 100644 --- a/tests/integration/godog/steps/server_steps.go +++ b/tests/integration/godog/steps/server_steps.go @@ -128,7 +128,7 @@ func (s *server) applyScenarioLabel() { } // todo: change this approach - for k, v := range k8sclient.DefaultCRDLabelMap { + for k, v := range k8sclient.DefaultCRDTestSuiteLabelMap { s.currentServer.Labels[k] = v } } diff --git a/tests/integration/godog/steps/world.go b/tests/integration/godog/steps/world.go index 652228af57..0a9b0ac926 100644 --- a/tests/integration/godog/steps/world.go +++ b/tests/integration/godog/steps/world.go @@ -26,11 +26,12 @@ type World struct { StartingClusterState string //todo: this will be a combination of starting state awareness of core 2 such as the //todo: server config,seldon config and seldon runtime to be able to reconcile to starting state should we change //todo: the state such as reducing replicas to 0 of scheduler to test unavailability - currentModel *Model - server *server - infer inference - logger log.FieldLogger - Label map[string]string + currentModel *Model + currentPipeline *Pipeline + server *server + infer inference + logger log.FieldLogger + Label map[string]string } type Config struct { @@ -61,6 +62,7 @@ func NewWorld(c Config) (*World, error) { kubeClient: c.KubeClient, watcherStorage: c.WatcherStorage, currentModel: NewModel(label, c.Namespace, c.K8sClient, c.Logger, c.WatcherStorage), + currentPipeline: NewPipeline(label, c.Namespace, c.K8sClient, c.Logger, c.WatcherStorage), server: newServer(label, c.Namespace, c.K8sClient, c.Logger, c.KubeClient), infer: inference{ host: c.IngressHost, diff --git a/tests/integration/godog/suite/suite.go b/tests/integration/godog/suite/suite.go index 329348ae56..219a8de334 100644 --- a/tests/integration/godog/suite/suite.go +++ b/tests/integration/godog/suite/suite.go @@ -79,7 +79,7 @@ func InitializeTestSuite(ctx *godog.TestSuiteContext) { panic(fmt.Errorf("failed to mlops client: %w", err)) } - watchStore, err := k8sclient.NewWatcherStore(config.Namespace, k8sclient.DefaultCRDLabel, clientSet.MlopsV1alpha1(), log) + watchStore, err := k8sclient.NewWatcherStore(config.Namespace, k8sclient.DefaultCRDTestSuiteLabel, clientSet.MlopsV1alpha1(), log) if err != nil { panic(fmt.Errorf("failed to create k8s watch store: %w", err)) } @@ -110,12 +110,15 @@ func InitializeTestSuite(ctx *godog.TestSuiteContext) { ctx.BeforeSuite(func() { suiteDeps.watcherStore.Start() + if err := suiteDeps.k8sClient.DeleteScenarioResources(context.Background(), k8sclient.DefaultCRDTestSuiteLabelMap); err != nil { + suiteDeps.logger.Errorf("error when deleting models on before steps: %v", err) + } // e.g. create namespace, apply CRDs, etc. }) ctx.AfterSuite(func() { suiteDeps.watcherStore.Stop() - // e.g. clean namespace, close clients if needed + // e.g. clean namespace, close clients if needed delete servers }) } @@ -162,5 +165,6 @@ func InitializeScenario(scenarioCtx *godog.ScenarioContext) { steps.LoadCustomModelSteps(scenarioCtx, world) steps.LoadInferenceSteps(scenarioCtx, world) steps.LoadServerSteps(scenarioCtx, world) + steps.LoadCustomPipelineSteps(scenarioCtx, world) // TODO: load other steps, e.g. pipeline, experiment, etc. }