diff --git a/tests/integration/godog/features/model/explicit_model_deployment.feature b/tests/integration/godog/features/model/custom_model_deployment.feature similarity index 57% rename from tests/integration/godog/features/model/explicit_model_deployment.feature rename to tests/integration/godog/features/model/custom_model_deployment.feature index aebb9082d0..a615669ff4 100644 --- a/tests/integration/godog/features/model/explicit_model_deployment.feature +++ b/tests/integration/godog/features/model/custom_model_deployment.feature @@ -1,15 +1,16 @@ -@ModelDeployment @Functional @Models @Explicit +@ModelDeployment @Functional @Models @CustomModelSpec Feature: Explicit Model deployment I deploy a custom model spec, wait for model to be deployed to the servers - and send an inference request to that model + and send an inference request to that model and expect a successful response. + I then delete the model and send inference requests and expect them to fail. Scenario: Load model and send inference request to envoy - Given I deploy model spec: + Given I deploy model spec with timeout "10s": """ apiVersion: mlops.seldon.io/v1alpha1 kind: Model metadata: - name: iris + name: alpha-1 spec: replicas: 1 requirements: @@ -17,8 +18,8 @@ Feature: Explicit Model deployment - mlserver storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn """ - When the model "iris" should eventually become Ready with timeout "20s" - Then send HTTP inference request with timeout "20s" to model "iris" with payload: + When the model "alpha-1" should eventually become Ready with timeout "20s" + Then send HTTP inference request with timeout "20s" to model "alpha-1" with payload: """ { "inputs": [ @@ -51,7 +52,7 @@ Feature: Explicit Model deployment } ] } """ - Then send gRPC inference request with timeout "20s" to model "iris" with payload: + Then send gRPC inference request with timeout "20s" to model "alpha-1" with payload: """ { "inputs": [ @@ -82,4 +83,35 @@ Feature: Explicit Model deployment "contents": {"int64_contents" : [2]} } ] } - """ \ No newline at end of file + """ + Then delete the model "alpha-1" with timeout "10s" + Then send HTTP inference request with timeout "20s" to model "alpha-1" with payload: + """ + { + "inputs": [ + { + "name": "predict", + "shape": [1, 4], + "datatype": "FP32", + "data": [[1, 2, 3, 4]] + } + ] + } + """ + And expect http response status code "404" + Then send gRPC inference request with timeout "20s" to model "alpha-1" with payload: + """ + { + "inputs": [ + { + "name": "predict", + "shape": [1, 4], + "datatype": "FP32", + "contents": { + "int64_contents" : [1, 2, 3, 4] + } + } + ] + } + """ + And expect gRPC response error to contain "Unimplemented" \ No newline at end of file diff --git a/tests/integration/godog/features/model/deployment.feature b/tests/integration/godog/features/model/deployment.feature index 993b61ed6f..ff225aa74e 100644 --- a/tests/integration/godog/features/model/deployment.feature +++ b/tests/integration/godog/features/model/deployment.feature @@ -4,26 +4,31 @@ Feature: Model deployment As a model user I need to create a Model resource and verify it is deployed - @0 Scenario: Success - Load a model Given I have an "iris" model When the model is applied Then the model should eventually become Ready - @0 Scenario: Success - Load a model again Given I have an "iris" model When the model is applied Then the model should eventually become Ready -# this approach might be more reusable specially for complex test cases, its all how expressive we want to be - Scenario: Load model - Given I have a model: + Scenario: Load a specific model + Given I deploy model spec with timeout "10s": """ - + apiVersion: mlops.seldon.io/v1alpha1 + kind: Model + metadata: + name: deployment-test-1 + spec: + replicas: 1 + requirements: + - sklearn + - mlserver + storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn """ - When the model is applied Then the model should eventually become Ready Scenario: Success - Load a model and expect status model available @@ -38,19 +43,10 @@ Feature: Model deployment When the model is applied Then the model should eventually become Ready - +# todo: change model type Scenario: Success - Load a big model - Given I have an "large-model" model + Given I have an "iris" model When the model is applied Then the model should eventually become Ready -# this would belong more to the feature of model server scheduling or capabilities - Scenario: Fail Load Model - no server capabilities in cluster - Given Given I have an "iris" model - And the model has "xgboost" capabilities - And there is no server in the cluster with capabilities "xgboost" - When the model is applied - Then the model eventually becomes not Ready - And the model status message should eventually be "ModelFailed" - diff --git a/tests/integration/godog/steps/custom_model_steps.go b/tests/integration/godog/steps/custom_model_steps.go new file mode 100644 index 0000000000..ea3ad8edf0 --- /dev/null +++ b/tests/integration/godog/steps/custom_model_steps.go @@ -0,0 +1,115 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed BY +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package steps + +import ( + "context" + "errors" + "fmt" + + "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" +) + +// deleteModel we have to wait for model to be deleted, as there is a finalizer attached so the scheduler can confirm +// when model has been unloaded from inference pod, model-gw, dataflow-engine, pipeline-gw and controller will remove +// finalizer so deletion can complete. +func (m *Model) deleteModel(ctx context.Context, model string) error { + modelCR, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Get(ctx, model, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return fmt.Errorf("model %s can't be deleted, does not exist", model) + } + return fmt.Errorf("failed to get model %s", model) + } + + if err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Delete(ctx, model, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("failed deleting model: %w", err) + } + + m.log.Debugf("Delete request for model %s sent", model) + + watcher, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", model), + ResourceVersion: modelCR.ResourceVersion, + }) + if err != nil { + return fmt.Errorf("failed watching model: %w", err) + } + defer watcher.Stop() + + m.log.Debugf("Waiting for %s model deletion confirmation", model) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case event, ok := <-watcher.ResultChan(): + if !ok { + return errors.New("watcher channel closed") + } + if event.Type == watch.Error { + return fmt.Errorf("watch error: %v", event.Object) + } + if event.Type == watch.Deleted { + return nil + } + } + } +} + +func (m *Model) waitForModelReady(ctx context.Context, model string) error { + foundModel, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Get(ctx, model, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed getting model: %w", err) + } + + if foundModel.Status.IsReady() { + return nil + } + + watcher, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", model), + ResourceVersion: foundModel.ResourceVersion, + Watch: true, + }) + if err != nil { + return fmt.Errorf("failed subscribed to watch model: %w", err) + } + defer watcher.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case event, ok := <-watcher.ResultChan(): + if !ok { + return fmt.Errorf("watch channel closed") + } + + if event.Type == watch.Error { + return fmt.Errorf("watch error: %v", event.Object) + } + + if event.Type == watch.Added || event.Type == watch.Modified { + model := event.Object.(*v1alpha1.Model) + if model.Status.IsReady() { + return nil + } + } + + if event.Type == watch.Deleted { + return fmt.Errorf("resource was deleted") + } + } + } +} diff --git a/tests/integration/godog/steps/explicit_model_steps.go b/tests/integration/godog/steps/explicit_model_steps.go deleted file mode 100644 index 03ec10338e..0000000000 --- a/tests/integration/godog/steps/explicit_model_steps.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright (c) 2024 Seldon Technologies Ltd. - -Use of this software is governed BY -(1) the license included in the LICENSE file or -(2) if the license included in the LICENSE file is the Business Source License 1.1, -the Change License after the Change Date as each is defined in accordance with the LICENSE file. -*/ - -package steps - -import "context" - -func (m *Model) waitForModelReady(ctx context.Context, model string) error { - // TODO: uncomment when auto-gen k8s client merged - - //foundModel, err := w.corek8sClient.MlopsV1alpha1().Models(w.namespace).Get(ctx, model, metav1.GetOptions{}) - //if err != nil { - // return fmt.Errorf("failed getting model: %w", err) - //} - // - //if foundModel.Status.IsReady() { - // return nil - //} - // - //watcher, err := w.corek8sClient.MlopsV1alpha1().Models(w.namespace).Watch(ctx, metav1.ListOptions{ - // FieldSelector: fmt.Sprintf("metadata.name=%s", model), - // ResourceVersion: foundModel.ResourceVersion, - // Watch: true, - //}) - //if err != nil { - // return fmt.Errorf("failed subscribed to watch model: %w", err) - //} - //defer watcher.Stop() - // - //for { - // select { - // case <-ctx.Done(): - // return ctx.Err() - // case event, ok := <-watcher.ResultChan(): - // if !ok { - // return fmt.Errorf("watch channel closed") - // } - // - // if event.Type == watch.Error { - // return fmt.Errorf("watch error: %v", event.Object) - // } - // - // if event.Type == watch.Added || event.Type == watch.Modified { - // model := event.Object.(*v1alpha1.Model) - // if model.Status.IsReady() { - // return nil - // } - // } - // - // if event.Type == watch.Deleted { - // return fmt.Errorf("resource was deleted") - // } - // } - //} - - return nil -} diff --git a/tests/integration/godog/steps/infer.go b/tests/integration/godog/steps/infer.go index 5ec7bb95db..ab70a1b188 100644 --- a/tests/integration/godog/steps/infer.go +++ b/tests/integration/godog/steps/infer.go @@ -61,23 +61,32 @@ func (i *inference) sendGRPCModelInferenceRequest(ctx context.Context, model str ctx = metadata.NewOutgoingContext(context.Background(), md) resp, err := i.grpc.ModelInfer(ctx, msg) if err != nil { - return fmt.Errorf("could not send grpc model inference: %w", err) + i.lastGRPCResponse.err = err } - i.lastGRPCResponse = resp + i.lastGRPCResponse.response = resp return nil } func withTimeoutCtx(timeout string, callback func(ctx context.Context) error) error { - timeoutDuration, err := time.ParseDuration(timeout) + ctx, cancel, err := timeoutToContext(timeout) if err != nil { - return fmt.Errorf("invalid timeout %s: %w", timeout, err) + return err } - ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) defer cancel() return callback(ctx) } +func timeoutToContext(timeout string) (context.Context, context.CancelFunc, error) { + d, err := time.ParseDuration(timeout) + if err != nil { + return nil, nil, fmt.Errorf("invalid timeout %s: %w", timeout, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), d) + return ctx, cancel, nil +} + func isSubset(needle, hay any) bool { nObj, nOK := needle.(map[string]any) hObj, hOK := hay.(map[string]any) @@ -126,12 +135,24 @@ func jsonContainsObjectSubset(jsonStr, needleStr string) (bool, error) { return containsSubset(needle, hay), nil } +func (i *inference) gRPCRespContainsError(err string) error { + if i.lastGRPCResponse.err == nil { + return errors.New("no gRPC response error found") + } + + if strings.Contains(i.lastGRPCResponse.err.Error(), err) { + return nil + } + + return fmt.Errorf("error %s does not contain %s", i.lastGRPCResponse.err.Error(), err) +} + func (i *inference) gRPCRespCheckBodyContainsJSON(expectJSON *godog.DocString) error { - if i.lastGRPCResponse == nil { + if i.lastGRPCResponse.response == nil { return errors.New("no gRPC response found") } - gotJson, err := json.Marshal(i.lastGRPCResponse) + gotJson, err := json.Marshal(i.lastGRPCResponse.response) if err != nil { return fmt.Errorf("could not marshal gRPC json: %w", err) } diff --git a/tests/integration/godog/steps/model_steps.go b/tests/integration/godog/steps/model_steps.go index de456824ec..2d6d7fa63e 100644 --- a/tests/integration/godog/steps/model_steps.go +++ b/tests/integration/godog/steps/model_steps.go @@ -12,18 +12,26 @@ package steps import ( "context" "fmt" + "maps" "time" "github.com/cucumber/godog" mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" + "github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned" "github.com/seldonio/seldon-core/tests/integration/godog/k8sclient" "github.com/seldonio/seldon-core/tests/integration/godog/steps/assertions" - "gopkg.in/yaml.v3" + "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" ) type Model struct { - model *mlopsv1alpha1.Model + label map[string]string + namespace string + model *mlopsv1alpha1.Model + k8sClient versioned.Interface + watcherStorage k8sclient.WatcherStorage + log logrus.FieldLogger } type TestModelConfig struct { @@ -47,10 +55,10 @@ var testModels = map[string]TestModelConfig{ }, } -func LoadModelSteps(scenario *godog.ScenarioContext, w *World) { +func LoadTemplateModelSteps(scenario *godog.ScenarioContext, w *World) { // Model Operations scenario.Step(`^I have an? "([^"]+)" model$`, func(modelName string) error { - return w.currentModel.IHaveAModel(modelName, w.Label) + return w.currentModel.IHaveAModel(modelName) }) scenario.Step(`^the model has "(\d+)" min replicas$`, w.currentModel.SetMinReplicas) scenario.Step(`^the model has "(\d+)" max replicas$`, w.currentModel.SetMaxReplicas) @@ -61,20 +69,31 @@ func LoadModelSteps(scenario *godog.ScenarioContext, w *World) { }) // Model Assertions scenario.Step(`^the model (?:should )?eventually become(?:s)? Ready$`, func() error { - return w.currentModel.ModelReady(w.watcherStorage) + // todo: maybe convert this to a flag + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + return w.currentModel.ModelReady(ctx) }) - scenario.Step(`^the model status message should be "([^"]+)"$`, w.currentModel.AssertModelStatus) + scenario.Step(`^the model status message eventually should be "([^"]+)"$`, w.currentModel.AssertModelStatus) } -func LoadExplicitModelSteps(scenario *godog.ScenarioContext, w *World) { - scenario.Step(`^I deploy model spec:$`, func(spec *godog.DocString) error { - return w.currentModel.deployModelSpec(spec, w.namespace, w.kubeClient) +func LoadCustomModelSteps(scenario *godog.ScenarioContext, w *World) { + scenario.Step(`^I deploy model spec with timeout "([^"]+)":$`, func(timeout string, spec *godog.DocString) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.currentModel.deployModelSpec(ctx, spec) + }) }) scenario.Step(`^the model "([^"]+)" should eventually become Ready with timeout "([^"]+)"$`, func(model, timeout string) error { return withTimeoutCtx(timeout, func(ctx context.Context) error { return w.currentModel.waitForModelReady(ctx, model) }) }) + scenario.Step(`^delete the model "([^"]+)" with timeout "([^"]+)"$`, func(model, timeout string) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.currentModel.deleteModel(ctx, model) + }) + }) } func LoadInferenceSteps(scenario *godog.ScenarioContext, w *World) { @@ -91,22 +110,37 @@ func LoadInferenceSteps(scenario *godog.ScenarioContext, w *World) { scenario.Step(`^expect http response status code "([^"]*)"$`, w.infer.httpRespCheckStatus) scenario.Step(`^expect http response body to contain JSON:$`, w.infer.httpRespCheckBodyContainsJSON) scenario.Step(`^expect gRPC response body to contain JSON:$`, w.infer.gRPCRespCheckBodyContainsJSON) + scenario.Step(`^expect gRPC response error to contain "([^"]+)"`, w.infer.gRPCRespContainsError) } -func (m *Model) deployModelSpec(spec *godog.DocString, namespace string, _ *k8sclient.K8sClient) error { +func (m *Model) deployModelSpec(ctx context.Context, spec *godog.DocString) error { modelSpec := &mlopsv1alpha1.Model{} if err := yaml.Unmarshal([]byte(spec.Content), &modelSpec); err != nil { return fmt.Errorf("failed unmarshalling model spec: %w", err) } - modelSpec.Namespace = namespace - // TODO: uncomment when auto-gen k8s client merged - //if _, err := w.corek8sClient.MlopsV1alpha1().Models(w.namespace).Create(context.TODO(), modelSpec, metav1.CreateOptions{}); err != nil { - // return fmt.Errorf("failed creating model: %w", err) - //} + modelSpec.Namespace = m.namespace + m.model = modelSpec + m.applyScenarioLabel() + if _, err := m.k8sClient.MlopsV1alpha1().Models(m.namespace).Create(ctx, m.model, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed creating model: %w", err) + } return nil } -func (m *Model) IHaveAModel(model string, label map[string]string) error { +func (m *Model) applyScenarioLabel() { + if m.model.Labels == nil { + m.model.Labels = make(map[string]string) + } + + maps.Copy(m.model.Labels, m.label) + + // todo: change this approach + for k, v := range k8sclient.DefaultCRDLabelMap { + m.model.Labels[k] = v + } +} + +func (m *Model) IHaveAModel(model string) error { testModel, ok := testModels[model] if !ok { return fmt.Errorf("model %s not found", model) @@ -130,7 +164,7 @@ func (m *Model) IHaveAModel(model string, label map[string]string) error { CreationTimestamp: metav1.Time{}, DeletionTimestamp: nil, DeletionGracePeriodSeconds: nil, - Labels: label, + Labels: m.label, Annotations: nil, OwnerReferences: nil, Finalizers: nil, @@ -161,8 +195,8 @@ func (m *Model) IHaveAModel(model string, label map[string]string) error { return nil } -func NewModel() *Model { - return &Model{model: &mlopsv1alpha1.Model{}} +func NewModel(label map[string]string, namespace string, k8sClient versioned.Interface, log logrus.FieldLogger, watcherStorage k8sclient.WatcherStorage) *Model { + return &Model{label: label, model: &mlopsv1alpha1.Model{}, log: log, namespace: namespace, k8sClient: k8sClient, watcherStorage: watcherStorage} } func (m *Model) SetMinReplicas(replicas int) { @@ -183,16 +217,11 @@ func (m *Model) ApplyModel(k *k8sclient.K8sClient) error { return nil } -func (m *Model) ModelReady(w k8sclient.WatcherStorage) error { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - // m.world.currentModel.model is assumed to be *mlopsv1alpha1.Model - // which implements runtime.Object, so no cast needed. - return w.WaitFor( +func (m *Model) ModelReady(ctx context.Context) error { + return m.watcherStorage.WaitFor( ctx, - m.model, - assertions.ModelReady, + m.model, // the k8s object being watched + assertions.ModelReady, // predicate from steps/assertions ) } diff --git a/tests/integration/godog/steps/world.go b/tests/integration/godog/steps/world.go index c394e8bea6..79e77180e0 100644 --- a/tests/integration/godog/steps/world.go +++ b/tests/integration/godog/steps/world.go @@ -36,6 +36,7 @@ type Config struct { Namespace string Logger log.FieldLogger KubeClient *k8sclient.K8sClient + K8sClient v.Interface WatcherStorage k8sclient.WatcherStorage GRPC v2_dataplane.GRPCInferenceServiceClient IngressHost string @@ -51,7 +52,12 @@ type inference struct { grpc v2_dataplane.GRPCInferenceServiceClient httpPort uint lastHTTPResponse *http.Response - lastGRPCResponse *v2_dataplane.ModelInferResponse + lastGRPCResponse lastGRPCResponse +} + +type lastGRPCResponse struct { + response *v2_dataplane.ModelInferResponse + err error } func NewWorld(c Config) (*World, error) { @@ -63,7 +69,7 @@ func NewWorld(c Config) (*World, error) { namespace: c.Namespace, kubeClient: c.KubeClient, watcherStorage: c.WatcherStorage, - currentModel: NewModel(), + currentModel: NewModel(label, c.Namespace, c.K8sClient, c.Logger, c.WatcherStorage), infer: inference{ host: c.IngressHost, http: &http.Client{}, diff --git a/tests/integration/godog/suite/suite.go b/tests/integration/godog/suite/suite.go index 80733aca0c..e9dc5d8f1e 100644 --- a/tests/integration/godog/suite/suite.go +++ b/tests/integration/godog/suite/suite.go @@ -126,6 +126,7 @@ func InitializeScenario(scenarioCtx *godog.ScenarioContext) { Namespace: suiteDeps.Config.Namespace, Logger: log, KubeClient: suiteDeps.k8sClient, + K8sClient: suiteDeps.mlopsClient, WatcherStorage: suiteDeps.watcherStore, IngressHost: suiteDeps.Config.Inference.Host, HTTPPort: suiteDeps.Config.Inference.HTTPPort, @@ -137,7 +138,7 @@ func InitializeScenario(scenarioCtx *godog.ScenarioContext) { panic(fmt.Errorf("failed to create world: %w", err)) } - // Before: Reset scenario-level state + // Before: reset state and prep cluster before each scenario scenarioCtx.Before(func(ctx context.Context, scenario *godog.Scenario) (context.Context, error) { return ctx, nil }) @@ -157,8 +158,8 @@ func InitializeScenario(scenarioCtx *godog.ScenarioContext) { }) // Register step definitions with access to world + k8sClient - steps.LoadModelSteps(scenarioCtx, world) - steps.LoadExplicitModelSteps(scenarioCtx, world) + steps.LoadTemplateModelSteps(scenarioCtx, world) + steps.LoadCustomModelSteps(scenarioCtx, world) steps.LoadInferenceSteps(scenarioCtx, world) // TODO: load other steps, e.g. pipeline, experiment, etc. }