Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
@ExperimentTrafficSplit @Functional @Experiments
Feature: Experiment traffic splitting
In order to perform A/B testing
As a model user
I need to create an Experiment resource that splits traffic 50/50 between models

Scenario: Success - Create experiment with 50/50 traffic split between two iris models
Given I deploy model spec with timeout "10s":
"""
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: experiment-1
spec:
replicas: 1
requirements:
- sklearn
- mlserver
storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn
"""
When the model "experiment-1" should eventually become Ready with timeout "20s"
Given I deploy model spec with timeout "10s":
"""
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: experiment-2
spec:
replicas: 1
requirements:
- sklearn
- mlserver
storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn
"""
When the model "experiment-2" should eventually become Ready with timeout "20s"
When I deploy experiment spec with timeout "60s":
"""
apiVersion: mlops.seldon.io/v1alpha1
kind: Experiment
metadata:
name: experiment-50-50
spec:
default: experiment-1
candidates:
- name: experiment-1
weight: 50
- name: experiment-2
weight: 50
"""
Then the experiment should eventually become Ready with timeout "60s"
When I send "20" HTTP inference requests to the experiment and expect all models in response, with payoad:
"""
{"inputs": [{"name": "predict", "shape": [1, 4], "datatype": "FP32", "data": [[1, 2, 3, 4]]}]}
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
@ServerSetup
Feature: Server setup
Deploys an mlserver with one replica. We ensure the pods
become ready and remove any other server pods for different
servers.

@ServerSetup @ServerSetupMLServer
Scenario: Deploy mlserver Server and remove other servers
Given I deploy server spec with timeout "10s":
"""
apiVersion: mlops.seldon.io/v1alpha1
kind: Server
metadata:
name: godog-mlserver
spec:
replicas: 1
serverConfig: mlserver
"""
When the server should eventually become Ready with timeout "30s"
Then ensure only "1" pod(s) are deployed for server and they are Ready


@ServerSetup @ServerClean
Scenario: Remove any other pre-existing servers
Given I remove any other server deployments which are not "godog-mlserver,godog-triton"

# TODO decide if we want to keep this, if we keep testers will need to ensure they don't run this tag when running all
# all features in this directory, as tests will fail when server is deleted. We can not delete and it's up to the
# feature dir server setup to ensure ONLY the required servers exist, like above.
# @ServerTeardown
# Scenario: Delete mlserver Server
# Given I delete server "godog-mlserver" with timeout "10s"
4 changes: 2 additions & 2 deletions tests/integration/godog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spf13/pflag v1.0.7
google.golang.org/grpc v1.73.0
google.golang.org/protobuf v1.36.6
k8s.io/api v0.34.3
k8s.io/apiextensions-apiserver v0.34.3
k8s.io/apimachinery v0.34.3
k8s.io/client-go v0.34.3
Expand Down Expand Up @@ -68,11 +70,9 @@ require (
golang.org/x/time v0.12.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.34.3 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect
Expand Down
22 changes: 21 additions & 1 deletion tests/integration/godog/k8sclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ func (k8s *K8sClient) DeleteScenarioResources(ctx context.Context, labels client
return fmt.Errorf("failed to delete Pipelines: %w", err)
}

if err := k8s.KubeClient.DeleteAllOf(
ctx,
&mlopsv1alpha1.Experiment{},
client.InNamespace(k8s.namespace),
labels,
); err != nil {
return fmt.Errorf("failed to delete Experiments: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

Expand Down Expand Up @@ -163,7 +172,18 @@ func (k8s *K8sClient) DeleteScenarioResources(ctx context.Context, labels client
return fmt.Errorf("failed to list Pipelines: %w", err)
}

if len(modelList.Items) == 0 && len(pipelineList.Items) == 0 {
// Check Experiments
var experimentList mlopsv1alpha1.ExperimentList
if err := k8s.KubeClient.List(
ctx,
&experimentList,
client.InNamespace(k8s.namespace),
labels,
); err != nil {
return fmt.Errorf("failed to list Experiments: %w", err)
}

if len(modelList.Items) == 0 && len(pipelineList.Items) == 0 && len(experimentList.Items) == 0 {
return nil
}
}
Expand Down
87 changes: 69 additions & 18 deletions tests/integration/godog/k8sclient/watcher_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type WatcherStorage interface {
WaitForObjectCondition(ctx context.Context, obj runtime.Object, cond ConditionFunc) error
WaitForModelCondition(ctx context.Context, modelName string, cond ConditionFunc) error
WaitForExperimentCondition(ctx context.Context, experimentName string, cond ConditionFunc) error
WaitForPipelineCondition(ctx context.Context, modelName string, cond ConditionFunc) error
Clear()
Start()
Expand All @@ -36,18 +37,20 @@ type WatcherStorage interface {
type objectKind string

const (
model objectKind = "Model"
pipeline objectKind = "Pipeline"
model objectKind = "Model"
pipeline objectKind = "Pipeline"
experiment objectKind = "Experiment"
)

type WatcherStore struct {
namespace string
label string
mlopsClient v1alpha1.MlopsV1alpha1Interface
modelWatcher watch.Interface
pipelineWatcher watch.Interface
logger log.FieldLogger
scheme *runtime.Scheme
namespace string
label string
mlopsClient v1alpha1.MlopsV1alpha1Interface
modelWatcher watch.Interface
pipelineWatcher watch.Interface
experimentWatcher watch.Interface
logger log.FieldLogger
scheme *runtime.Scheme

mu sync.RWMutex
store map[string]runtime.Object // key: "namespace/name"
Expand Down Expand Up @@ -80,21 +83,27 @@ func NewWatcherStore(namespace string, label string, mlopsClient v1alpha1.MlopsV
return nil, fmt.Errorf("failed to create pipeline watcher: %w", err)
}

experimentWatcher, err := mlopsClient.Experiments(namespace).Watch(context.Background(), v1.ListOptions{LabelSelector: DefaultCRDTestSuiteLabel})
if err != nil {
return nil, fmt.Errorf("failed to create experiment watcher: %w", err)
}

// Base scheme + register your CRDs
s := runtime.NewScheme()
_ = scheme.AddToScheme(s) // core k8s types (optional but fine)
_ = mlopsscheme.AddToScheme(s) // <-- this is the key line for your CRDs

return &WatcherStore{
namespace: namespace,
label: label,
mlopsClient: mlopsClient,
modelWatcher: modelWatcher,
pipelineWatcher: pipelineWatcher,
logger: logger.WithField("client", "watcher_store"),
store: make(map[string]runtime.Object),
doneChan: make(chan struct{}),
scheme: s,
namespace: namespace,
label: label,
mlopsClient: mlopsClient,
modelWatcher: modelWatcher,
experimentWatcher: experimentWatcher,
pipelineWatcher: pipelineWatcher,
logger: logger.WithField("client", "watcher_store"),
store: make(map[string]runtime.Object),
doneChan: make(chan struct{}),
scheme: s,
}, nil
}

Expand Down Expand Up @@ -172,6 +181,42 @@ func (s *WatcherStore) Start() {
}
}
}()
go func() {
for {
select {
case event, ok := <-s.experimentWatcher.ResultChan():
if !ok {
// channel closed: watcher terminated
return
}

accessor, err := meta.Accessor(event.Object)
if err != nil {
s.logger.WithError(err).Error("failed to access experiment watcher")
} else {
s.logger.WithField("event", event).Tracef("new experiment watch event with name: %s on namespace: %s", accessor.GetName(), accessor.GetNamespace())
}

if event.Object == nil {
continue
}

switch event.Type {
case watch.Added, watch.Modified:
s.put(event.Object)
case watch.Deleted:
s.delete(event.Object)
case watch.Error:
fmt.Printf("experiment watch error: %v\n", event.Object)
}

case <-s.doneChan:
// Stop underlying watcher and exit
s.experimentWatcher.Stop()
return
}
}
}()
}

// Stop terminates the watcher loop.
Expand Down Expand Up @@ -308,6 +353,12 @@ func (s *WatcherStore) WaitForObjectCondition(ctx context.Context, obj runtime.O
return err
}
}

func (s *WatcherStore) WaitForExperimentCondition(ctx context.Context, experimentName string, cond ConditionFunc) error {
key := fmt.Sprintf("%s/%s/%s", s.namespace, experiment, experimentName)
return s.waitForKey(ctx, key, cond)
}

func (s *WatcherStore) WaitForModelCondition(ctx context.Context, modelName string, cond ConditionFunc) error {
key := fmt.Sprintf("%s/%s/%s", s.namespace, model, modelName)
return s.waitForKey(ctx, key, cond)
Expand Down
15 changes: 12 additions & 3 deletions tests/integration/godog/steps/assertions/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,20 @@ func ModelReady(obj runtime.Object) (bool, error) {
return false, fmt.Errorf("unexpected type %T, expected *v1alpha1.Model", obj)
}

if model.Status.IsReady() {
return true, nil
return model.Status.IsReady(), nil
}

func ExperimentReady(obj runtime.Object) (bool, error) {
if obj == nil {
return false, nil
}

experiment, ok := obj.(*v1alpha1.Experiment)
if !ok {
return false, fmt.Errorf("unexpected type %T, expected *v1alpha1.Experiment", obj)
}

return false, nil
return experiment.Status.IsReady(), nil
}

//func ModelReadyMessageCondition(expectedMessage string) k8sclient.ConditionFunc {
Expand Down
19 changes: 19 additions & 0 deletions tests/integration/godog/steps/custom_model_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,31 @@ import (
"errors"
"fmt"

"github.com/cucumber/godog"
"github.com/seldonio/seldon-core/tests/integration/godog/steps/assertions"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

func LoadCustomModelSteps(scenario *godog.ScenarioContext, w *World) {
scenario.Step(`^I deploy model spec with timeout "([^"]+)":$`, func(timeout string, spec *godog.DocString) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.currentModel.deployModelSpec(ctx, spec)
})
})
scenario.Step(`^the model "([^"]+)" should eventually become Ready with timeout "([^"]+)"$`, func(model, timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.currentModel.waitForModelNameReady(ctx, model)
})
})
scenario.Step(`^delete the model "([^"]+)" with timeout "([^"]+)"$`, func(model, timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.currentModel.deleteModel(ctx, model)
})
})
}

// deleteModel we have to wait for model to be deleted, as there is a finalizer attached so the scheduler can confirm
// when model has been unloaded from inference pod, model-gw, dataflow-engine, pipeline-gw and controller will remove
// finalizer so deletion can complete.
Expand Down
Loading
Loading