Skip to content

Commit 75a07c8

Browse files
authored
feat(e2e-tests): model experiment test (#7023)
* model expierments test * fix model reqs
1 parent 274b59f commit 75a07c8

File tree

12 files changed

+391
-57
lines changed

12 files changed

+391
-57
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
@ExperimentTrafficSplit @Functional @Experiments
2+
Feature: Experiment traffic splitting
3+
In order to perform A/B testing
4+
As a model user
5+
I need to create an Experiment resource that splits traffic 50/50 between models
6+
7+
Scenario: Success - Create experiment with 50/50 traffic split between two iris models
8+
Given I deploy model spec with timeout "10s":
9+
"""
10+
apiVersion: mlops.seldon.io/v1alpha1
11+
kind: Model
12+
metadata:
13+
name: experiment-1
14+
spec:
15+
replicas: 1
16+
requirements:
17+
- sklearn
18+
- mlserver
19+
storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn
20+
"""
21+
When the model "experiment-1" should eventually become Ready with timeout "20s"
22+
Given I deploy model spec with timeout "10s":
23+
"""
24+
apiVersion: mlops.seldon.io/v1alpha1
25+
kind: Model
26+
metadata:
27+
name: experiment-2
28+
spec:
29+
replicas: 1
30+
requirements:
31+
- sklearn
32+
- mlserver
33+
storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn
34+
"""
35+
When the model "experiment-2" should eventually become Ready with timeout "20s"
36+
When I deploy experiment spec with timeout "60s":
37+
"""
38+
apiVersion: mlops.seldon.io/v1alpha1
39+
kind: Experiment
40+
metadata:
41+
name: experiment-50-50
42+
spec:
43+
default: experiment-1
44+
candidates:
45+
- name: experiment-1
46+
weight: 50
47+
- name: experiment-2
48+
weight: 50
49+
"""
50+
Then the experiment should eventually become Ready with timeout "60s"
51+
When I send "20" HTTP inference requests to the experiment and expect all models in response, with payoad:
52+
"""
53+
{"inputs": [{"name": "predict", "shape": [1, 4], "datatype": "FP32", "data": [[1, 2, 3, 4]]}]}
54+
"""
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
@ServerSetup
2+
Feature: Server setup
3+
Deploys an mlserver with one replica. We ensure the pods
4+
become ready and remove any other server pods for different
5+
servers.
6+
7+
@ServerSetup @ServerSetupMLServer
8+
Scenario: Deploy mlserver Server and remove other servers
9+
Given I deploy server spec with timeout "10s":
10+
"""
11+
apiVersion: mlops.seldon.io/v1alpha1
12+
kind: Server
13+
metadata:
14+
name: godog-mlserver
15+
spec:
16+
replicas: 1
17+
serverConfig: mlserver
18+
"""
19+
When the server should eventually become Ready with timeout "30s"
20+
Then ensure only "1" pod(s) are deployed for server and they are Ready
21+
22+
23+
@ServerSetup @ServerClean
24+
Scenario: Remove any other pre-existing servers
25+
Given I remove any other server deployments which are not "godog-mlserver,godog-triton"
26+
27+
# TODO decide if we want to keep this, if we keep testers will need to ensure they don't run this tag when running all
28+
# all features in this directory, as tests will fail when server is deleted. We can not delete and it's up to the
29+
# feature dir server setup to ensure ONLY the required servers exist, like above.
30+
# @ServerTeardown
31+
# Scenario: Delete mlserver Server
32+
# Given I delete server "godog-mlserver" with timeout "10s"

tests/integration/godog/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ require (
99
github.com/sirupsen/logrus v1.9.3
1010
github.com/spf13/pflag v1.0.7
1111
google.golang.org/grpc v1.73.0
12+
google.golang.org/protobuf v1.36.6
13+
k8s.io/api v0.34.3
1214
k8s.io/apiextensions-apiserver v0.34.3
1315
k8s.io/apimachinery v0.34.3
1416
k8s.io/client-go v0.34.3
@@ -68,11 +70,9 @@ require (
6870
golang.org/x/time v0.12.0 // indirect
6971
gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect
7072
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
71-
google.golang.org/protobuf v1.36.6 // indirect
7273
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
7374
gopkg.in/inf.v0 v0.9.1 // indirect
7475
gopkg.in/yaml.v3 v3.0.1 // indirect
75-
k8s.io/api v0.34.3 // indirect
7676
k8s.io/klog/v2 v2.130.1 // indirect
7777
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
7878
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect

tests/integration/godog/k8sclient/client.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,15 @@ func (k8s *K8sClient) DeleteScenarioResources(ctx context.Context, labels client
129129
return fmt.Errorf("failed to delete Pipelines: %w", err)
130130
}
131131

132+
if err := k8s.KubeClient.DeleteAllOf(
133+
ctx,
134+
&mlopsv1alpha1.Experiment{},
135+
client.InNamespace(k8s.namespace),
136+
labels,
137+
); err != nil {
138+
return fmt.Errorf("failed to delete Experiments: %w", err)
139+
}
140+
132141
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
133142
defer cancel()
134143

@@ -163,7 +172,18 @@ func (k8s *K8sClient) DeleteScenarioResources(ctx context.Context, labels client
163172
return fmt.Errorf("failed to list Pipelines: %w", err)
164173
}
165174

166-
if len(modelList.Items) == 0 && len(pipelineList.Items) == 0 {
175+
// Check Experiments
176+
var experimentList mlopsv1alpha1.ExperimentList
177+
if err := k8s.KubeClient.List(
178+
ctx,
179+
&experimentList,
180+
client.InNamespace(k8s.namespace),
181+
labels,
182+
); err != nil {
183+
return fmt.Errorf("failed to list Experiments: %w", err)
184+
}
185+
186+
if len(modelList.Items) == 0 && len(pipelineList.Items) == 0 && len(experimentList.Items) == 0 {
167187
return nil
168188
}
169189
}

tests/integration/godog/k8sclient/watcher_store.go

Lines changed: 69 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
type WatcherStorage interface {
2828
WaitForObjectCondition(ctx context.Context, obj runtime.Object, cond ConditionFunc) error
2929
WaitForModelCondition(ctx context.Context, modelName string, cond ConditionFunc) error
30+
WaitForExperimentCondition(ctx context.Context, experimentName string, cond ConditionFunc) error
3031
WaitForPipelineCondition(ctx context.Context, modelName string, cond ConditionFunc) error
3132
Clear()
3233
Start()
@@ -36,18 +37,20 @@ type WatcherStorage interface {
3637
type objectKind string
3738

3839
const (
39-
model objectKind = "Model"
40-
pipeline objectKind = "Pipeline"
40+
model objectKind = "Model"
41+
pipeline objectKind = "Pipeline"
42+
experiment objectKind = "Experiment"
4143
)
4244

4345
type WatcherStore struct {
44-
namespace string
45-
label string
46-
mlopsClient v1alpha1.MlopsV1alpha1Interface
47-
modelWatcher watch.Interface
48-
pipelineWatcher watch.Interface
49-
logger log.FieldLogger
50-
scheme *runtime.Scheme
46+
namespace string
47+
label string
48+
mlopsClient v1alpha1.MlopsV1alpha1Interface
49+
modelWatcher watch.Interface
50+
pipelineWatcher watch.Interface
51+
experimentWatcher watch.Interface
52+
logger log.FieldLogger
53+
scheme *runtime.Scheme
5154

5255
mu sync.RWMutex
5356
store map[string]runtime.Object // key: "namespace/name"
@@ -80,21 +83,27 @@ func NewWatcherStore(namespace string, label string, mlopsClient v1alpha1.MlopsV
8083
return nil, fmt.Errorf("failed to create pipeline watcher: %w", err)
8184
}
8285

86+
experimentWatcher, err := mlopsClient.Experiments(namespace).Watch(context.Background(), v1.ListOptions{LabelSelector: DefaultCRDTestSuiteLabel})
87+
if err != nil {
88+
return nil, fmt.Errorf("failed to create experiment watcher: %w", err)
89+
}
90+
8391
// Base scheme + register your CRDs
8492
s := runtime.NewScheme()
8593
_ = scheme.AddToScheme(s) // core k8s types (optional but fine)
8694
_ = mlopsscheme.AddToScheme(s) // <-- this is the key line for your CRDs
8795

8896
return &WatcherStore{
89-
namespace: namespace,
90-
label: label,
91-
mlopsClient: mlopsClient,
92-
modelWatcher: modelWatcher,
93-
pipelineWatcher: pipelineWatcher,
94-
logger: logger.WithField("client", "watcher_store"),
95-
store: make(map[string]runtime.Object),
96-
doneChan: make(chan struct{}),
97-
scheme: s,
97+
namespace: namespace,
98+
label: label,
99+
mlopsClient: mlopsClient,
100+
modelWatcher: modelWatcher,
101+
experimentWatcher: experimentWatcher,
102+
pipelineWatcher: pipelineWatcher,
103+
logger: logger.WithField("client", "watcher_store"),
104+
store: make(map[string]runtime.Object),
105+
doneChan: make(chan struct{}),
106+
scheme: s,
98107
}, nil
99108
}
100109

@@ -172,6 +181,42 @@ func (s *WatcherStore) Start() {
172181
}
173182
}
174183
}()
184+
go func() {
185+
for {
186+
select {
187+
case event, ok := <-s.experimentWatcher.ResultChan():
188+
if !ok {
189+
// channel closed: watcher terminated
190+
return
191+
}
192+
193+
accessor, err := meta.Accessor(event.Object)
194+
if err != nil {
195+
s.logger.WithError(err).Error("failed to access experiment watcher")
196+
} else {
197+
s.logger.WithField("event", event).Tracef("new experiment watch event with name: %s on namespace: %s", accessor.GetName(), accessor.GetNamespace())
198+
}
199+
200+
if event.Object == nil {
201+
continue
202+
}
203+
204+
switch event.Type {
205+
case watch.Added, watch.Modified:
206+
s.put(event.Object)
207+
case watch.Deleted:
208+
s.delete(event.Object)
209+
case watch.Error:
210+
fmt.Printf("experiment watch error: %v\n", event.Object)
211+
}
212+
213+
case <-s.doneChan:
214+
// Stop underlying watcher and exit
215+
s.experimentWatcher.Stop()
216+
return
217+
}
218+
}
219+
}()
175220
}
176221

177222
// Stop terminates the watcher loop.
@@ -308,6 +353,12 @@ func (s *WatcherStore) WaitForObjectCondition(ctx context.Context, obj runtime.O
308353
return err
309354
}
310355
}
356+
357+
func (s *WatcherStore) WaitForExperimentCondition(ctx context.Context, experimentName string, cond ConditionFunc) error {
358+
key := fmt.Sprintf("%s/%s/%s", s.namespace, experiment, experimentName)
359+
return s.waitForKey(ctx, key, cond)
360+
}
361+
311362
func (s *WatcherStore) WaitForModelCondition(ctx context.Context, modelName string, cond ConditionFunc) error {
312363
key := fmt.Sprintf("%s/%s/%s", s.namespace, model, modelName)
313364
return s.waitForKey(ctx, key, cond)

tests/integration/godog/steps/assertions/model.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,20 @@ func ModelReady(obj runtime.Object) (bool, error) {
2626
return false, fmt.Errorf("unexpected type %T, expected *v1alpha1.Model", obj)
2727
}
2828

29-
if model.Status.IsReady() {
30-
return true, nil
29+
return model.Status.IsReady(), nil
30+
}
31+
32+
func ExperimentReady(obj runtime.Object) (bool, error) {
33+
if obj == nil {
34+
return false, nil
35+
}
36+
37+
experiment, ok := obj.(*v1alpha1.Experiment)
38+
if !ok {
39+
return false, fmt.Errorf("unexpected type %T, expected *v1alpha1.Experiment", obj)
3140
}
3241

33-
return false, nil
42+
return experiment.Status.IsReady(), nil
3443
}
3544

3645
//func ModelReadyMessageCondition(expectedMessage string) k8sclient.ConditionFunc {

tests/integration/godog/steps/custom_model_steps.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,31 @@ import (
1414
"errors"
1515
"fmt"
1616

17+
"github.com/cucumber/godog"
1718
"github.com/seldonio/seldon-core/tests/integration/godog/steps/assertions"
1819
k8serrors "k8s.io/apimachinery/pkg/api/errors"
1920
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2021
"k8s.io/apimachinery/pkg/watch"
2122
)
2223

24+
func LoadCustomModelSteps(scenario *godog.ScenarioContext, w *World) {
25+
scenario.Step(`^I deploy model spec with timeout "([^"]+)":$`, func(timeout string, spec *godog.DocString) error {
26+
return withTimeoutCtx(timeout, func(ctx context.Context) error {
27+
return w.currentModel.deployModelSpec(ctx, spec)
28+
})
29+
})
30+
scenario.Step(`^the model "([^"]+)" should eventually become Ready with timeout "([^"]+)"$`, func(model, timeout string) error {
31+
return withTimeoutCtx(timeout, func(ctx context.Context) error {
32+
return w.currentModel.waitForModelNameReady(ctx, model)
33+
})
34+
})
35+
scenario.Step(`^delete the model "([^"]+)" with timeout "([^"]+)"$`, func(model, timeout string) error {
36+
return withTimeoutCtx(timeout, func(ctx context.Context) error {
37+
return w.currentModel.deleteModel(ctx, model)
38+
})
39+
})
40+
}
41+
2342
// deleteModel we have to wait for model to be deleted, as there is a finalizer attached so the scheduler can confirm
2443
// when model has been unloaded from inference pod, model-gw, dataflow-engine, pipeline-gw and controller will remove
2544
// finalizer so deletion can complete.

0 commit comments

Comments
 (0)