diff --git a/tests/integration/godog/features/model/server_setup.feature b/tests/integration/godog/features/model/server_setup.feature new file mode 100644 index 0000000000..7895b3c6a3 --- /dev/null +++ b/tests/integration/godog/features/model/server_setup.feature @@ -0,0 +1,32 @@ +@Server +Feature: Server setup + Deploys an mlserver with one replica. We ensure the pods + become ready and remove any other server pods for different + servers. + + @ServerSetup + Scenario: Deploy mlserver Server and remove other servers + Given I deploy server spec with timeout "10s": + """ + apiVersion: mlops.seldon.io/v1alpha1 + kind: Server + metadata: + name: godog-mlserver + spec: + replicas: 1 + serverConfig: mlserver + requirements: + - sklearn + - mlserver + storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn + """ + When the server should eventually become Ready with timeout "30s" + Then ensure only "1" pod(s) are deployed for server and they are Ready + And remove any other server deployments + +# TODO decide if we want to keep this, if we keep testers will need to ensure they don't run this tag when running all +# all features in this directory, as tests will fail when server is deleted. We can not delete and it's up to the +# feature dir server setup to ensure ONLY the required servers exist, like above. +# @ServerTeardown +# Scenario: Delete mlserver Server +# Given I delete server "godog-mlserver" with timeout "10s" \ No newline at end of file diff --git a/tests/integration/godog/main_test.go b/tests/integration/godog/main_test.go index 76f76e6d24..4b17b694a0 100644 --- a/tests/integration/godog/main_test.go +++ b/tests/integration/godog/main_test.go @@ -10,6 +10,7 @@ the Change License after the Change Date as each is defined in accordance with t package main__test import ( + "fmt" "os" "testing" @@ -19,18 +20,21 @@ import ( "github.com/spf13/pflag" // godog v0.11.0 and later ) +const cmdOptPrefix = "godog." + var opts = godog.Options{ Output: colors.Colored(os.Stdout), Format: "progress", // can define default values } func init() { - godog.BindCommandLineFlags("godog.", &opts) // godog v0.11.0 and later + godog.BindCommandLineFlags(cmdOptPrefix, &opts) // godog v0.11.0 and later } func TestMain(m *testing.M) { + flagSet := pflag.CommandLine + flagSet.StringSliceVar(&opts.Paths, fmt.Sprintf("%s%s", cmdOptPrefix, "paths"), []string{}, "paths to feature files") pflag.Parse() - opts.Paths = pflag.Args() status := godog.TestSuite{ Name: "godogs", diff --git a/tests/integration/godog/steps/custom_model_steps.go b/tests/integration/godog/steps/custom_model_steps.go index ea3ad8edf0..e1228382ec 100644 --- a/tests/integration/godog/steps/custom_model_steps.go +++ b/tests/integration/godog/steps/custom_model_steps.go @@ -105,6 +105,8 @@ func (m *Model) waitForModelReady(ctx context.Context, model string) error { if model.Status.IsReady() { return nil } + m.log.Debugf("got watch event: model %s is not ready, still waiting", model) + continue } if event.Type == watch.Deleted { diff --git a/tests/integration/godog/steps/infer.go b/tests/integration/godog/steps/infer_steps.go similarity index 77% rename from tests/integration/godog/steps/infer.go rename to tests/integration/godog/steps/infer_steps.go index fab43eab37..526bdcf8c2 100644 --- a/tests/integration/godog/steps/infer.go +++ b/tests/integration/godog/steps/infer_steps.go @@ -25,6 +25,44 @@ import ( "google.golang.org/grpc/metadata" ) +type inference struct { + ssl bool + host string + http *http.Client + grpc v2_dataplane.GRPCInferenceServiceClient + httpPort uint + lastHTTPResponse *http.Response + lastGRPCResponse lastGRPCResponse +} + +func LoadInferenceSteps(scenario *godog.ScenarioContext, w *World) { + scenario.Step(`^send HTTP inference request with timeout "([^"]+)" to model "([^"]+)" with payload:$`, func(timeout, model string, payload *godog.DocString) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.infer.sendHTTPModelInferenceRequest(ctx, model, payload) + }) + }) + scenario.Step(`^send gRPC inference request with timeout "([^"]+)" to model "([^"]+)" with payload:$`, func(timeout, model string, payload *godog.DocString) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.infer.sendGRPCModelInferenceRequest(ctx, model, payload) + }) + }) + scenario.Step(`^(?:I )send a valid gRPC inference request with timeout "([^"]+)"`, func(timeout string) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.infer.sendGRPCModelInferenceRequestFromModel(ctx, w.currentModel) + }) + }) + scenario.Step(`^(?:I )send a valid HTTP inference request with timeout "([^"]+)"`, func(timeout string) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.infer.sendHTTPModelInferenceRequestFromModel(ctx, w.currentModel) + }) + }) + + scenario.Step(`^expect http response status code "([^"]*)"$`, w.infer.httpRespCheckStatus) + scenario.Step(`^expect http response body to contain JSON:$`, w.infer.httpRespCheckBodyContainsJSON) + scenario.Step(`^expect gRPC response body to contain JSON:$`, w.infer.gRPCRespCheckBodyContainsJSON) + scenario.Step(`^expect gRPC response error to contain "([^"]+)"`, w.infer.gRPCRespContainsError) +} + func (i *inference) doHTTPModelInferenceRequest(ctx context.Context, modelName, body string) error { url := fmt.Sprintf( "%s://%s:%d/v2/models/%s/infer", diff --git a/tests/integration/godog/steps/model_steps.go b/tests/integration/godog/steps/model_steps.go index 20821eb298..caf86c02c9 100644 --- a/tests/integration/godog/steps/model_steps.go +++ b/tests/integration/godog/steps/model_steps.go @@ -133,35 +133,6 @@ func LoadCustomModelSteps(scenario *godog.ScenarioContext, w *World) { }) } -func LoadInferenceSteps(scenario *godog.ScenarioContext, w *World) { - scenario.Step(`^send HTTP inference request with timeout "([^"]+)" to model "([^"]+)" with payload:$`, func(timeout, model string, payload *godog.DocString) error { - return withTimeoutCtx(timeout, func(ctx context.Context) error { - return w.infer.sendHTTPModelInferenceRequest(ctx, model, payload) - }) - }) - scenario.Step(`^send gRPC inference request with timeout "([^"]+)" to model "([^"]+)" with payload:$`, func(timeout, model string, payload *godog.DocString) error { - return withTimeoutCtx(timeout, func(ctx context.Context) error { - return w.infer.sendGRPCModelInferenceRequest(ctx, model, payload) - }) - }) - scenario.Step(`^(?:I )send a valid gRPC inference request with timeout "([^"]+)"`, func(timeout string) error { - return withTimeoutCtx(timeout, func(ctx context.Context) error { - return w.infer.sendGRPCModelInferenceRequestFromModel(ctx, w.currentModel) - }) - }) - scenario.Step(`^(?:I )send a valid HTTP inference request with timeout "([^"]+)"`, func(timeout string) error { - return withTimeoutCtx(timeout, func(ctx context.Context) error { - return w.infer.sendHTTPModelInferenceRequestFromModel(ctx, w.currentModel) - }) - }) - - scenario.Step(`^expect http response status code "([^"]*)"$`, w.infer.httpRespCheckStatus) - scenario.Step(`^expect http response body to contain JSON:$`, w.infer.httpRespCheckBodyContainsJSON) - scenario.Step(`^expect gRPC response body to contain JSON:$`, w.infer.gRPCRespCheckBodyContainsJSON) - scenario.Step(`^expect gRPC response error to contain "([^"]+)"`, w.infer.gRPCRespContainsError) - -} - func (m *Model) deployModelSpec(ctx context.Context, spec *godog.DocString) error { modelSpec := &mlopsv1alpha1.Model{} if err := yaml.Unmarshal([]byte(spec.Content), &modelSpec); err != nil { diff --git a/tests/integration/godog/steps/server_steps.go b/tests/integration/godog/steps/server_steps.go new file mode 100644 index 0000000000..8f605fb106 --- /dev/null +++ b/tests/integration/godog/steps/server_steps.go @@ -0,0 +1,225 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed BY +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package steps + +import ( + "context" + "errors" + "fmt" + "maps" + + "github.com/cucumber/godog" + mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" + "github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned" + "github.com/seldonio/seldon-core/tests/integration/godog/k8sclient" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/equality" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/yaml" +) + +type server struct { + label map[string]string + namespace string + seldonK8sClient versioned.Interface + k8sClient *k8sclient.K8sClient + currentServer *mlopsv1alpha1.Server + log logrus.FieldLogger +} + +func newServer(label map[string]string, namespace string, seldonK8sClient versioned.Interface, log logrus.FieldLogger, k8sClient *k8sclient.K8sClient) *server { + return &server{ + label: label, + namespace: namespace, + seldonK8sClient: seldonK8sClient, + k8sClient: k8sClient, + log: log, + } +} + +func LoadServerSteps(scenario *godog.ScenarioContext, w *World) { + scenario.Step(`^I deploy server spec with timeout "([^"]+)":$`, func(timeout string, spec *godog.DocString) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.server.deployServerSpec(ctx, spec) + }) + }) + scenario.Step(`^the server should eventually become Ready with timeout "([^"]+)"$`, func(timeout string) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.server.requiresCurrentServer(func() error { + return w.server.waitForServerReady(ctx) + }) + }) + }) + scenario.Step(`^ensure only "([^"]+)" pod\(s\) are deployed for server and they are Ready$`, func(replicaCount int32) error { + return withTimeoutCtx("10s", func(ctx context.Context) error { + return w.server.requiresCurrentServer(func() error { + return w.server.checkPodsAreReady(ctx, replicaCount) + }) + }) + }) + scenario.Step(`^remove any other server deployments$`, func() error { + return withTimeoutCtx("10s", func(ctx context.Context) error { + return w.server.requiresCurrentServer(func() error { + return w.server.removeOtherServers(ctx) + }) + }) + }) + scenario.Step(`^I delete server "([^"]+)" with timeout "([^"]+)"$`, func(server, timeout string) error { + return withTimeoutCtx(timeout, func(ctx context.Context) error { + return w.server.deleteServer(ctx, server) + }) + }) +} + +func (s *server) requiresCurrentServer(callback func() error) error { + if s.currentServer == nil { + return errors.New("current server not set") + } + return callback() +} + +func (s *server) deployServerSpec(ctx context.Context, spec *godog.DocString) error { + serverSpec := &mlopsv1alpha1.Server{} + if err := yaml.Unmarshal([]byte(spec.Content), &serverSpec); err != nil { + return fmt.Errorf("failed unmarshalling server spec: %w", err) + } + serverSpec.Namespace = s.namespace + s.currentServer = serverSpec + s.applyScenarioLabel() + if _, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Create(ctx, s.currentServer, metav1.CreateOptions{}); err != nil { + if k8serrors.IsAlreadyExists(err) { + s.log.Debugf("server %s already exists, checking if equal", s.currentServer.Name) + deployerServer, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Get(ctx, s.currentServer.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed getting server: %w", err) + } + if equality.Semantic.DeepEqual(serverSpec.Spec, deployerServer.Spec) { + s.log.Debugf("server %s deployed spec equals desired spec", s.currentServer.Name) + return nil + } + s.log.Debugf("server %s deployed spec needs updating to desired spec", s.currentServer.Name) + deployerServer.Spec = s.currentServer.Spec + if _, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Update(ctx, deployerServer, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed updating server: %w", err) + } + return nil + } + return fmt.Errorf("failed creating server: %w", err) + } + return nil +} + +func (s *server) applyScenarioLabel() { + if s.currentServer.Labels == nil { + s.currentServer.Labels = s.label + } else { + maps.Copy(s.currentServer.Labels, s.label) + } + + // todo: change this approach + for k, v := range k8sclient.DefaultCRDLabelMap { + s.currentServer.Labels[k] = v + } +} + +func (s *server) removeOtherServers(ctx context.Context) error { + servers, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed listing servers: %w", err) + } + for _, server := range servers.Items { + if server.Name == s.currentServer.Name { + continue + } + if err := s.deleteServer(ctx, server.Name); err != nil { + return fmt.Errorf("failed deleting server: %w", err) + } + s.log.Infof("removed server %q", server.Name) + } + + return nil +} + +func (s *server) deleteServer(ctx context.Context, server string) error { + return s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Delete(ctx, server, metav1.DeleteOptions{}) +} + +func (s *server) checkPodsAreReady(ctx context.Context, replicaCount int32) error { + statefulSet := &v1.StatefulSet{} + if err := s.k8sClient.KubeClient.Get(ctx, types.NamespacedName{ + Namespace: s.namespace, + Name: s.currentServer.Name, + }, statefulSet); err != nil { + return fmt.Errorf("failed getting statefulSet: %w", err) + } + + if *statefulSet.Spec.Replicas != replicaCount { + return fmt.Errorf("expected %d replicas but got %d on statefulset spec", replicaCount, *statefulSet.Spec.Replicas) + } + + if statefulSet.Status.ReadyReplicas == replicaCount { + return nil + } + + return fmt.Errorf("ready replicas %d does not match %d", statefulSet.Status.ReadyReplicas, replicaCount) +} + +func (s *server) waitForServerReady(ctx context.Context) error { + foundServer, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Get(ctx, s.currentServer.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed getting server: %w", err) + } + + if foundServer.Status.IsReady() { + return nil + } + + watcher, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", s.currentServer.Name), + ResourceVersion: foundServer.ResourceVersion, + Watch: true, + }) + if err != nil { + return fmt.Errorf("failed subscribed to watch server: %w", err) + } + defer watcher.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case event, ok := <-watcher.ResultChan(): + if !ok { + return fmt.Errorf("watch channel closed") + } + + if event.Type == watch.Error { + return fmt.Errorf("watch error: %v", event.Object) + } + + if event.Type == watch.Added || event.Type == watch.Modified { + srv := event.Object.(*mlopsv1alpha1.Server) + if srv.Status.IsReady() { + return nil + } + s.log.Debugf("got watch event: server %s is not ready, still waiting", s.currentServer.Name) + continue + } + + if event.Type == watch.Deleted { + return fmt.Errorf("resource was deleted") + } + } + } +} diff --git a/tests/integration/godog/steps/world.go b/tests/integration/godog/steps/world.go index 79e77180e0..652228af57 100644 --- a/tests/integration/godog/steps/world.go +++ b/tests/integration/godog/steps/world.go @@ -27,6 +27,7 @@ type World struct { //todo: server config,seldon config and seldon runtime to be able to reconcile to starting state should we change //todo: the state such as reducing replicas to 0 of scheduler to test unavailability currentModel *Model + server *server infer inference logger log.FieldLogger Label map[string]string @@ -45,16 +46,6 @@ type Config struct { SSL bool } -type inference struct { - ssl bool - host string - http *http.Client - grpc v2_dataplane.GRPCInferenceServiceClient - httpPort uint - lastHTTPResponse *http.Response - lastGRPCResponse lastGRPCResponse -} - type lastGRPCResponse struct { response *v2_dataplane.ModelInferResponse err error @@ -70,6 +61,7 @@ func NewWorld(c Config) (*World, error) { kubeClient: c.KubeClient, watcherStorage: c.WatcherStorage, currentModel: NewModel(label, c.Namespace, c.K8sClient, c.Logger, c.WatcherStorage), + server: newServer(label, c.Namespace, c.K8sClient, c.Logger, c.KubeClient), infer: inference{ host: c.IngressHost, http: &http.Client{}, diff --git a/tests/integration/godog/suite/suite.go b/tests/integration/godog/suite/suite.go index ded10c893e..329348ae56 100644 --- a/tests/integration/godog/suite/suite.go +++ b/tests/integration/godog/suite/suite.go @@ -161,5 +161,6 @@ func InitializeScenario(scenarioCtx *godog.ScenarioContext) { steps.LoadTemplateModelSteps(scenarioCtx, world) steps.LoadCustomModelSteps(scenarioCtx, world) steps.LoadInferenceSteps(scenarioCtx, world) + steps.LoadServerSteps(scenarioCtx, world) // TODO: load other steps, e.g. pipeline, experiment, etc. }