Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
@ModelDeployment @Functional @Models @Explicit
@ModelDeployment @Functional @Models @CustomModelSpec
Feature: Explicit Model deployment
I deploy a custom model spec, wait for model to be deployed to the servers
and send an inference request to that model
and send an inference request to that model and expect a successful response.
I then delete the model and send inference requests and expect them to fail.

Scenario: Load model and send inference request to envoy
Given I deploy model spec:
Given I deploy model spec with timeout "10s":
"""
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: iris
name: alpha-1
spec:
replicas: 1
requirements:
- sklearn
- mlserver
storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn
"""
When the model "iris" should eventually become Ready with timeout "20s"
Then send HTTP inference request with timeout "20s" to model "iris" with payload:
When the model "alpha-1" should eventually become Ready with timeout "20s"
Then send HTTP inference request with timeout "20s" to model "alpha-1" with payload:
"""
{
"inputs": [
Expand Down Expand Up @@ -51,7 +52,7 @@ Feature: Explicit Model deployment
}
] }
"""
Then send gRPC inference request with timeout "20s" to model "iris" with payload:
Then send gRPC inference request with timeout "20s" to model "alpha-1" with payload:
"""
{
"inputs": [
Expand Down Expand Up @@ -82,4 +83,35 @@ Feature: Explicit Model deployment
"contents": {"int64_contents" : [2]}
}
] }
"""
"""
Then delete the model "alpha-1" with timeout "10s"
Then send HTTP inference request with timeout "20s" to model "alpha-1" with payload:
"""
{
"inputs": [
{
"name": "predict",
"shape": [1, 4],
"datatype": "FP32",
"data": [[1, 2, 3, 4]]
}
]
}
"""
And expect http response status code "404"
Then send gRPC inference request with timeout "20s" to model "alpha-1" with payload:
"""
{
"inputs": [
{
"name": "predict",
"shape": [1, 4],
"datatype": "FP32",
"contents": {
"int64_contents" : [1, 2, 3, 4]
}
}
]
}
"""
And expect gRPC response error to contain "Unimplemented"
32 changes: 14 additions & 18 deletions tests/integration/godog/features/model/deployment.feature
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,31 @@ Feature: Model deployment
As a model user
I need to create a Model resource and verify it is deployed

@0
Scenario: Success - Load a model
Given I have an "iris" model
When the model is applied
Then the model should eventually become Ready


@0
Scenario: Success - Load a model again
Given I have an "iris" model
When the model is applied
Then the model should eventually become Ready

# this approach might be more reusable specially for complex test cases, its all how expressive we want to be
Scenario: Load model
Given I have a model:
Scenario: Load a specific model
Given I deploy model spec with timeout "10s":
"""

apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: deployment-test-1
spec:
replicas: 1
requirements:
- sklearn
- mlserver
storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn
"""
When the model is applied
Then the model should eventually become Ready

Scenario: Success - Load a model and expect status model available
Expand All @@ -38,19 +43,10 @@ Feature: Model deployment
When the model is applied
Then the model should eventually become Ready


# todo: change model type
Scenario: Success - Load a big model
Given I have an "large-model" model
Given I have an "iris" model
When the model is applied
Then the model should eventually become Ready

# this would belong more to the feature of model server scheduling or capabilities
Scenario: Fail Load Model - no server capabilities in cluster
Given Given I have an "iris" model
And the model has "xgboost" capabilities
And there is no server in the cluster with capabilities "xgboost"
When the model is applied
Then the model eventually becomes not Ready
And the model status message should eventually be "ModelFailed"


115 changes: 115 additions & 0 deletions tests/integration/godog/steps/custom_model_steps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
Copyright (c) 2024 Seldon Technologies Ltd.

Use of this software is governed BY
(1) the license included in the LICENSE file or
(2) if the license included in the LICENSE file is the Business Source License 1.1,
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/

package steps

import (
"context"
"errors"
"fmt"

"github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

// deleteModel we have to wait for model to be deleted, as there is a finalizer attached so the scheduler can confirm
// when model has been unloaded from inference pod, model-gw, dataflow-engine, pipeline-gw and controller will remove
// finalizer so deletion can complete.
func (m *Model) deleteModel(ctx context.Context, model string) error {
modelCR, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Get(ctx, model, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return fmt.Errorf("model %s can't be deleted, does not exist", model)
}
return fmt.Errorf("failed to get model %s", model)
}

if err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Delete(ctx, model, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("failed deleting model: %w", err)
}

m.log.Debugf("Delete request for model %s sent", model)

watcher, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", model),
ResourceVersion: modelCR.ResourceVersion,
})
if err != nil {
return fmt.Errorf("failed watching model: %w", err)
}
defer watcher.Stop()

m.log.Debugf("Waiting for %s model deletion confirmation", model)

for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-watcher.ResultChan():
if !ok {
return errors.New("watcher channel closed")
}
if event.Type == watch.Error {
return fmt.Errorf("watch error: %v", event.Object)
}
if event.Type == watch.Deleted {
return nil
}
}
}
}

func (m *Model) waitForModelReady(ctx context.Context, model string) error {
foundModel, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Get(ctx, model, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed getting model: %w", err)
}

if foundModel.Status.IsReady() {
return nil
}

watcher, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", model),
ResourceVersion: foundModel.ResourceVersion,
Watch: true,
})
if err != nil {
return fmt.Errorf("failed subscribed to watch model: %w", err)
}
defer watcher.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("watch channel closed")
}

if event.Type == watch.Error {
return fmt.Errorf("watch error: %v", event.Object)
}

if event.Type == watch.Added || event.Type == watch.Modified {
model := event.Object.(*v1alpha1.Model)
if model.Status.IsReady() {
return nil
}
}

if event.Type == watch.Deleted {
return fmt.Errorf("resource was deleted")
}
}
}
}
63 changes: 0 additions & 63 deletions tests/integration/godog/steps/explicit_model_steps.go

This file was deleted.

35 changes: 28 additions & 7 deletions tests/integration/godog/steps/infer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,32 @@ func (i *inference) sendGRPCModelInferenceRequest(ctx context.Context, model str
ctx = metadata.NewOutgoingContext(context.Background(), md)
resp, err := i.grpc.ModelInfer(ctx, msg)
if err != nil {
return fmt.Errorf("could not send grpc model inference: %w", err)
i.lastGRPCResponse.err = err
}

i.lastGRPCResponse = resp
i.lastGRPCResponse.response = resp
return nil
}

func withTimeoutCtx(timeout string, callback func(ctx context.Context) error) error {
timeoutDuration, err := time.ParseDuration(timeout)
ctx, cancel, err := timeoutToContext(timeout)
if err != nil {
return fmt.Errorf("invalid timeout %s: %w", timeout, err)
return err
}
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()
return callback(ctx)
}

func timeoutToContext(timeout string) (context.Context, context.CancelFunc, error) {
d, err := time.ParseDuration(timeout)
if err != nil {
return nil, nil, fmt.Errorf("invalid timeout %s: %w", timeout, err)
}

ctx, cancel := context.WithTimeout(context.Background(), d)
return ctx, cancel, nil
}

func isSubset(needle, hay any) bool {
nObj, nOK := needle.(map[string]any)
hObj, hOK := hay.(map[string]any)
Expand Down Expand Up @@ -126,12 +135,24 @@ func jsonContainsObjectSubset(jsonStr, needleStr string) (bool, error) {
return containsSubset(needle, hay), nil
}

func (i *inference) gRPCRespContainsError(err string) error {
if i.lastGRPCResponse.err == nil {
return errors.New("no gRPC response error found")
}

if strings.Contains(i.lastGRPCResponse.err.Error(), err) {
return nil
}

return fmt.Errorf("error %s does not contain %s", i.lastGRPCResponse.err.Error(), err)
}

func (i *inference) gRPCRespCheckBodyContainsJSON(expectJSON *godog.DocString) error {
if i.lastGRPCResponse == nil {
if i.lastGRPCResponse.response == nil {
return errors.New("no gRPC response found")
}

gotJson, err := json.Marshal(i.lastGRPCResponse)
gotJson, err := json.Marshal(i.lastGRPCResponse.response)
if err != nil {
return fmt.Errorf("could not marshal gRPC json: %w", err)
}
Expand Down
Loading
Loading