Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions tests/integration/godog/features/model/server_setup.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
@Server
Feature: Server setup
Deploys an mlserver with one replica. We ensure the pods
become ready and remove any other server pods for different
servers.

@ServerSetup
Scenario: Deploy mlserver Server and remove other servers
Given I deploy server spec with timeout "10s":
"""
apiVersion: mlops.seldon.io/v1alpha1
kind: Server
metadata:
name: godog-mlserver
spec:
replicas: 1
serverConfig: mlserver
requirements:
- sklearn
- mlserver
storageUri: gs://seldon-models/scv2/samples/mlserver_1.3.5/iris-sklearn
"""
When the server should eventually become Ready with timeout "30s"
Then ensure only "1" pod(s) are deployed for server and they are Ready
And remove any other server deployments

# TODO decide if we want to keep this, if we keep testers will need to ensure they don't run this tag when running all
# all features in this directory, as tests will fail when server is deleted. We can not delete and it's up to the
# feature dir server setup to ensure ONLY the required servers exist, like above.
# @ServerTeardown
# Scenario: Delete mlserver Server
# Given I delete server "godog-mlserver" with timeout "10s"
8 changes: 6 additions & 2 deletions tests/integration/godog/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ the Change License after the Change Date as each is defined in accordance with t
package main__test

import (
"fmt"
"os"
"testing"

Expand All @@ -19,18 +20,21 @@ import (
"github.com/spf13/pflag" // godog v0.11.0 and later
)

const cmdOptPrefix = "godog."

var opts = godog.Options{
Output: colors.Colored(os.Stdout),
Format: "progress", // can define default values
}

func init() {
godog.BindCommandLineFlags("godog.", &opts) // godog v0.11.0 and later
godog.BindCommandLineFlags(cmdOptPrefix, &opts) // godog v0.11.0 and later
}

func TestMain(m *testing.M) {
flagSet := pflag.CommandLine
flagSet.StringSliceVar(&opts.Paths, fmt.Sprintf("%s%s", cmdOptPrefix, "paths"), []string{}, "paths to feature files")
pflag.Parse()
opts.Paths = pflag.Args()

status := godog.TestSuite{
Name: "godogs",
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/godog/steps/custom_model_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (m *Model) waitForModelReady(ctx context.Context, model string) error {
if model.Status.IsReady() {
return nil
}
m.log.Debugf("got watch event: model %s is not ready, still waiting", model)
continue
}

if event.Type == watch.Deleted {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,44 @@ import (
"google.golang.org/grpc/metadata"
)

type inference struct {
ssl bool
host string
http *http.Client
grpc v2_dataplane.GRPCInferenceServiceClient
httpPort uint
lastHTTPResponse *http.Response
lastGRPCResponse lastGRPCResponse
}

func LoadInferenceSteps(scenario *godog.ScenarioContext, w *World) {
scenario.Step(`^send HTTP inference request with timeout "([^"]+)" to model "([^"]+)" with payload:$`, func(timeout, model string, payload *godog.DocString) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.infer.sendHTTPModelInferenceRequest(ctx, model, payload)
})
})
scenario.Step(`^send gRPC inference request with timeout "([^"]+)" to model "([^"]+)" with payload:$`, func(timeout, model string, payload *godog.DocString) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.infer.sendGRPCModelInferenceRequest(ctx, model, payload)
})
})
scenario.Step(`^(?:I )send a valid gRPC inference request with timeout "([^"]+)"`, func(timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.infer.sendGRPCModelInferenceRequestFromModel(ctx, w.currentModel)
})
})
scenario.Step(`^(?:I )send a valid HTTP inference request with timeout "([^"]+)"`, func(timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.infer.sendHTTPModelInferenceRequestFromModel(ctx, w.currentModel)
})
})

scenario.Step(`^expect http response status code "([^"]*)"$`, w.infer.httpRespCheckStatus)
scenario.Step(`^expect http response body to contain JSON:$`, w.infer.httpRespCheckBodyContainsJSON)
scenario.Step(`^expect gRPC response body to contain JSON:$`, w.infer.gRPCRespCheckBodyContainsJSON)
scenario.Step(`^expect gRPC response error to contain "([^"]+)"`, w.infer.gRPCRespContainsError)
}

func (i *inference) doHTTPModelInferenceRequest(ctx context.Context, modelName, body string) error {
url := fmt.Sprintf(
"%s://%s:%d/v2/models/%s/infer",
Expand Down
29 changes: 0 additions & 29 deletions tests/integration/godog/steps/model_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,35 +133,6 @@ func LoadCustomModelSteps(scenario *godog.ScenarioContext, w *World) {
})
}

func LoadInferenceSteps(scenario *godog.ScenarioContext, w *World) {
scenario.Step(`^send HTTP inference request with timeout "([^"]+)" to model "([^"]+)" with payload:$`, func(timeout, model string, payload *godog.DocString) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.infer.sendHTTPModelInferenceRequest(ctx, model, payload)
})
})
scenario.Step(`^send gRPC inference request with timeout "([^"]+)" to model "([^"]+)" with payload:$`, func(timeout, model string, payload *godog.DocString) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.infer.sendGRPCModelInferenceRequest(ctx, model, payload)
})
})
scenario.Step(`^(?:I )send a valid gRPC inference request with timeout "([^"]+)"`, func(timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.infer.sendGRPCModelInferenceRequestFromModel(ctx, w.currentModel)
})
})
scenario.Step(`^(?:I )send a valid HTTP inference request with timeout "([^"]+)"`, func(timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.infer.sendHTTPModelInferenceRequestFromModel(ctx, w.currentModel)
})
})

scenario.Step(`^expect http response status code "([^"]*)"$`, w.infer.httpRespCheckStatus)
scenario.Step(`^expect http response body to contain JSON:$`, w.infer.httpRespCheckBodyContainsJSON)
scenario.Step(`^expect gRPC response body to contain JSON:$`, w.infer.gRPCRespCheckBodyContainsJSON)
scenario.Step(`^expect gRPC response error to contain "([^"]+)"`, w.infer.gRPCRespContainsError)

}

func (m *Model) deployModelSpec(ctx context.Context, spec *godog.DocString) error {
modelSpec := &mlopsv1alpha1.Model{}
if err := yaml.Unmarshal([]byte(spec.Content), &modelSpec); err != nil {
Expand Down
225 changes: 225 additions & 0 deletions tests/integration/godog/steps/server_steps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
Copyright (c) 2024 Seldon Technologies Ltd.

Use of this software is governed BY
(1) the license included in the LICENSE file or
(2) if the license included in the LICENSE file is the Business Source License 1.1,
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/

package steps

import (
"context"
"errors"
"fmt"
"maps"

"github.com/cucumber/godog"
mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
"github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned"
"github.com/seldonio/seldon-core/tests/integration/godog/k8sclient"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/yaml"
)

type server struct {
label map[string]string
namespace string
seldonK8sClient versioned.Interface
k8sClient *k8sclient.K8sClient
currentServer *mlopsv1alpha1.Server
log logrus.FieldLogger
}

func newServer(label map[string]string, namespace string, seldonK8sClient versioned.Interface, log logrus.FieldLogger, k8sClient *k8sclient.K8sClient) *server {
return &server{
label: label,
namespace: namespace,
seldonK8sClient: seldonK8sClient,
k8sClient: k8sClient,
log: log,
}
}

func LoadServerSteps(scenario *godog.ScenarioContext, w *World) {
scenario.Step(`^I deploy server spec with timeout "([^"]+)":$`, func(timeout string, spec *godog.DocString) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.server.deployServerSpec(ctx, spec)
})
})
scenario.Step(`^the server should eventually become Ready with timeout "([^"]+)"$`, func(timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.server.requiresCurrentServer(func() error {
return w.server.waitForServerReady(ctx)
})
})
})
scenario.Step(`^ensure only "([^"]+)" pod\(s\) are deployed for server and they are Ready$`, func(replicaCount int32) error {
return withTimeoutCtx("10s", func(ctx context.Context) error {
return w.server.requiresCurrentServer(func() error {
return w.server.checkPodsAreReady(ctx, replicaCount)
})
})
})
scenario.Step(`^remove any other server deployments$`, func() error {
return withTimeoutCtx("10s", func(ctx context.Context) error {
return w.server.requiresCurrentServer(func() error {
return w.server.removeOtherServers(ctx)
})
})
})
scenario.Step(`^I delete server "([^"]+)" with timeout "([^"]+)"$`, func(server, timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.server.deleteServer(ctx, server)
})
})
}

func (s *server) requiresCurrentServer(callback func() error) error {
if s.currentServer == nil {
return errors.New("current server not set")
}
return callback()
}

func (s *server) deployServerSpec(ctx context.Context, spec *godog.DocString) error {
serverSpec := &mlopsv1alpha1.Server{}
if err := yaml.Unmarshal([]byte(spec.Content), &serverSpec); err != nil {
return fmt.Errorf("failed unmarshalling server spec: %w", err)
}
serverSpec.Namespace = s.namespace
s.currentServer = serverSpec
s.applyScenarioLabel()
if _, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Create(ctx, s.currentServer, metav1.CreateOptions{}); err != nil {
if k8serrors.IsAlreadyExists(err) {
s.log.Debugf("server %s already exists, checking if equal", s.currentServer.Name)
deployerServer, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Get(ctx, s.currentServer.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed getting server: %w", err)
}
if equality.Semantic.DeepEqual(serverSpec.Spec, deployerServer.Spec) {
s.log.Debugf("server %s deployed spec equals desired spec", s.currentServer.Name)
return nil
}
s.log.Debugf("server %s deployed spec needs updating to desired spec", s.currentServer.Name)
deployerServer.Spec = s.currentServer.Spec
if _, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Update(ctx, deployerServer, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed updating server: %w", err)
}
return nil
}
return fmt.Errorf("failed creating server: %w", err)
}
return nil
}

func (s *server) applyScenarioLabel() {
if s.currentServer.Labels == nil {
s.currentServer.Labels = s.label
} else {
maps.Copy(s.currentServer.Labels, s.label)
}

// todo: change this approach
for k, v := range k8sclient.DefaultCRDLabelMap {
s.currentServer.Labels[k] = v
}
}

func (s *server) removeOtherServers(ctx context.Context) error {
servers, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed listing servers: %w", err)
}
for _, server := range servers.Items {
if server.Name == s.currentServer.Name {
continue
}
if err := s.deleteServer(ctx, server.Name); err != nil {
return fmt.Errorf("failed deleting server: %w", err)
}
s.log.Infof("removed server %q", server.Name)
}

return nil
}

func (s *server) deleteServer(ctx context.Context, server string) error {
return s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Delete(ctx, server, metav1.DeleteOptions{})
}

func (s *server) checkPodsAreReady(ctx context.Context, replicaCount int32) error {
statefulSet := &v1.StatefulSet{}
if err := s.k8sClient.KubeClient.Get(ctx, types.NamespacedName{
Namespace: s.namespace,
Name: s.currentServer.Name,
}, statefulSet); err != nil {
return fmt.Errorf("failed getting statefulSet: %w", err)
}

if *statefulSet.Spec.Replicas != replicaCount {
return fmt.Errorf("expected %d replicas but got %d on statefulset spec", replicaCount, *statefulSet.Spec.Replicas)
}

if statefulSet.Status.ReadyReplicas == replicaCount {
return nil
}

return fmt.Errorf("ready replicas %d does not match %d", statefulSet.Status.ReadyReplicas, replicaCount)
}

func (s *server) waitForServerReady(ctx context.Context) error {
foundServer, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Get(ctx, s.currentServer.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed getting server: %w", err)
}

if foundServer.Status.IsReady() {
return nil
}

watcher, err := s.seldonK8sClient.MlopsV1alpha1().Servers(s.namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", s.currentServer.Name),
ResourceVersion: foundServer.ResourceVersion,
Watch: true,
})
if err != nil {
return fmt.Errorf("failed subscribed to watch server: %w", err)
}
defer watcher.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("watch channel closed")
}

if event.Type == watch.Error {
return fmt.Errorf("watch error: %v", event.Object)
}

if event.Type == watch.Added || event.Type == watch.Modified {
srv := event.Object.(*mlopsv1alpha1.Server)
if srv.Status.IsReady() {
return nil
}
s.log.Debugf("got watch event: server %s is not ready, still waiting", s.currentServer.Name)
continue
}

if event.Type == watch.Deleted {
return fmt.Errorf("resource was deleted")
}
}
}
}
Loading
Loading