Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions tests/integration/godog/k8sclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,16 @@ import (
"fmt"

mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
log "github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

type Client interface {
ApplyModel(model *mlopsv1alpha1.Model) error
GetModel(model string) (*mlopsv1alpha1.Model, error)
ApplyPipeline(pipeline *mlopsv1alpha1.Pipeline) error
GetPipeline(pipeline string) (*mlopsv1alpha1.Pipeline, error)
}

type K8sClient struct {
logger log.FieldLogger
namespace string
KubeClient client.WithWatch
}
Expand All @@ -42,7 +37,11 @@ const (
)

// New todo: separate k8s client init and pass to new
func New(namespace string) (*K8sClient, error) {
func New(namespace string, logger *log.Logger) (*K8sClient, error) {
if logger == nil {
logger = log.New()
}

k8sScheme := runtime.NewScheme()

if err := scheme.AddToScheme(k8sScheme); err != nil {
Expand All @@ -64,6 +63,7 @@ func New(namespace string) (*K8sClient, error) {
}

return &K8sClient{
logger: logger.WithField("client", "k8sClient"),
namespace: namespace,
KubeClient: cl,
}, nil
Expand Down Expand Up @@ -109,23 +109,15 @@ func (k8s *K8sClient) ApplyModel(model *mlopsv1alpha1.Model) error {
}

func (k8s *K8sClient) DeleteScenarioResources(ctx context.Context, labels client.MatchingLabels) error {

list := &mlopsv1alpha1.ModelList{}
err := k8s.KubeClient.List(ctx, list,
// 1) Delete Models
if err := k8s.KubeClient.DeleteAllOf(
ctx,
&mlopsv1alpha1.Model{},
client.InNamespace(k8s.namespace),
labels,
)
if err != nil {
); err != nil {
return err
}

for _, m := range list.Items {
// Copy because Delete expects a pointer
model := m
if err := k8s.KubeClient.Delete(ctx, &model); err != nil {
return err
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"

"github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned/typed/mlops/v1alpha1"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,6 +36,7 @@ type WatcherStore struct {
label string
mlopsClient v1alpha1.MlopsV1alpha1Interface
modelWatcher watch.Interface
logger log.FieldLogger

mu sync.RWMutex
store map[string]runtime.Object // key: "namespace/name"
Expand All @@ -52,7 +54,11 @@ type waiter struct {
type ConditionFunc func(obj runtime.Object) (done bool, err error)

// NewWatcherStore receives events that match on a particular object list and creates a database store to query crd state
func NewWatcherStore(namespace string, label string, mlopsClient v1alpha1.MlopsV1alpha1Interface) (*WatcherStore, error) {
func NewWatcherStore(namespace string, label string, mlopsClient v1alpha1.MlopsV1alpha1Interface, logger *log.Logger) (*WatcherStore, error) {
if logger == nil {
logger = log.New()
}

modelWatcher, err := mlopsClient.Models(namespace).Watch(context.Background(), v1.ListOptions{LabelSelector: "test-suite=godog"})
if err != nil {
return nil, fmt.Errorf("failed to create model watcher: %w", err)
Expand All @@ -63,6 +69,7 @@ func NewWatcherStore(namespace string, label string, mlopsClient v1alpha1.MlopsV
label: label,
mlopsClient: mlopsClient,
modelWatcher: modelWatcher,
logger: logger.WithField("client", "watcher_store"),
store: make(map[string]runtime.Object),
doneChan: make(chan struct{}),
}, nil
Expand All @@ -79,7 +86,12 @@ func (s *WatcherStore) Start() {
return
}

fmt.Printf("model watch event: %v\n", event)
accessor, err := meta.Accessor(event.Object)
if err != nil {
s.logger.WithError(err).Error("failed to access model watcher")
} else {
s.logger.Debugf("new model watch event with name: %s on namespace: %s", accessor.GetName(), accessor.GetNamespace())
}

if event.Object == nil {
continue
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/godog/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/cucumber/godog"
"github.com/cucumber/godog/colors"
"github.com/seldonio/seldon-core/tests/integration/godog/scenario"
"github.com/seldonio/seldon-core/tests/integration/godog/suite"
"github.com/spf13/pflag" // godog v0.11.0 and later
)

Expand All @@ -34,8 +34,8 @@ func TestMain(m *testing.M) {

status := godog.TestSuite{
Name: "godogs",
TestSuiteInitializer: scenario.InitializeTestSuite,
ScenarioInitializer: scenario.InitializeScenario,
TestSuiteInitializer: suite.InitializeTestSuite,
ScenarioInitializer: suite.InitializeScenario,
Options: &opts,
}.Run()

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/godog/steps/explicit_model_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import "context"
func (m *Model) waitForModelReady(ctx context.Context, model string) error {
// TODO: uncomment when auto-gen k8s client merged

//foundModel, err := w.k8sClient.MlopsV1alpha1().Models(w.namespace).Get(ctx, model, metav1.GetOptions{})
//foundModel, err := w.corek8sClient.MlopsV1alpha1().Models(w.namespace).Get(ctx, model, metav1.GetOptions{})
//if err != nil {
// return fmt.Errorf("failed getting model: %w", err)
//}
Expand All @@ -23,7 +23,7 @@ func (m *Model) waitForModelReady(ctx context.Context, model string) error {
// return nil
//}
//
//watcher, err := w.k8sClient.MlopsV1alpha1().Models(w.namespace).Watch(ctx, metav1.ListOptions{
//watcher, err := w.corek8sClient.MlopsV1alpha1().Models(w.namespace).Watch(ctx, metav1.ListOptions{
// FieldSelector: fmt.Sprintf("metadata.name=%s", model),
// ResourceVersion: foundModel.ResourceVersion,
// Watch: true,
Expand Down
35 changes: 14 additions & 21 deletions tests/integration/godog/steps/model_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/cucumber/godog"
mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
"github.com/seldonio/seldon-core/tests/integration/godog/k8sclient"
"github.com/seldonio/seldon-core/tests/integration/godog/scenario/assertions"
"github.com/seldonio/seldon-core/tests/integration/godog/steps/assertions"
"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -50,29 +50,29 @@ var testModels = map[string]TestModelConfig{
func LoadModelSteps(scenario *godog.ScenarioContext, w *World) {
// Model Operations
scenario.Step(`^I have an? "([^"]+)" model$`, func(modelName string) error {
return w.CurrentModel.IHaveAModel(modelName, w.Label)
return w.currentModel.IHaveAModel(modelName, w.Label)
})
scenario.Step(`^the model has "(\d+)" min replicas$`, w.CurrentModel.SetMinReplicas)
scenario.Step(`^the model has "(\d+)" max replicas$`, w.CurrentModel.SetMaxReplicas)
scenario.Step(`^the model has "(\d+)" replicas$`, w.CurrentModel.SetReplicas)
scenario.Step(`^the model has "(\d+)" min replicas$`, w.currentModel.SetMinReplicas)
scenario.Step(`^the model has "(\d+)" max replicas$`, w.currentModel.SetMaxReplicas)
scenario.Step(`^the model has "(\d+)" replicas$`, w.currentModel.SetReplicas)
// Model Deployments
scenario.Step(`^the model is applied$`, func() error {
return w.CurrentModel.ApplyModel(w.KubeClient)
return w.currentModel.ApplyModel(w.kubeClient)
})
// Model Assertions
scenario.Step(`^the model (?:should )?eventually become(?:s)? Ready$`, func() error {
return w.CurrentModel.ModelReady(w.WatcherStorage)
return w.currentModel.ModelReady(w.watcherStorage)
})
scenario.Step(`^the model status message should be "([^"]+)"$`, w.CurrentModel.AssertModelStatus)
scenario.Step(`^the model status message should be "([^"]+)"$`, w.currentModel.AssertModelStatus)
}

func LoadExplicitModelSteps(scenario *godog.ScenarioContext, w *World) {
scenario.Step(`^I deploy model spec:$`, func(spec *godog.DocString) error {
return w.CurrentModel.deployModelSpec(spec, w.namespace, w.KubeClient)
return w.currentModel.deployModelSpec(spec, w.namespace, w.kubeClient)
})
scenario.Step(`^the model "([^"]+)" should eventually become Ready with timeout "([^"]+)"$`, func(model, timeout string) error {
return withTimeoutCtx(timeout, func(ctx context.Context) error {
return w.CurrentModel.waitForModelReady(ctx, model)
return w.currentModel.waitForModelReady(ctx, model)
})
})
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (m *Model) deployModelSpec(spec *godog.DocString, namespace string, _ *k8sc
}
modelSpec.Namespace = namespace
// TODO: uncomment when auto-gen k8s client merged
//if _, err := w.k8sClient.MlopsV1alpha1().Models(w.namespace).Create(context.TODO(), modelSpec, metav1.CreateOptions{}); err != nil {
//if _, err := w.corek8sClient.MlopsV1alpha1().Models(w.namespace).Create(context.TODO(), modelSpec, metav1.CreateOptions{}); err != nil {
// return fmt.Errorf("failed creating model: %w", err)
//}
return nil
Expand All @@ -112,7 +112,7 @@ func (m *Model) IHaveAModel(model string, label map[string]string) error {
return fmt.Errorf("model %s not found", model)
}

modelName := fmt.Sprintf("%s-%s", testModel.Name, randomSuffix(3))
modelName := fmt.Sprintf("%s-%s", testModel.Name, randomString(3))

m.model = &mlopsv1alpha1.Model{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -165,10 +165,6 @@ func NewModel() *Model {
return &Model{model: &mlopsv1alpha1.Model{}}
}

func (m *Model) Reset(world *World) {
world.CurrentModel.model = &mlopsv1alpha1.Model{}
}

func (m *Model) SetMinReplicas(replicas int) {

}
Expand All @@ -179,11 +175,8 @@ func (m *Model) SetReplicas(replicas int) {}

// ApplyModel model is aware of namespace and testsuite config and it might add extra information to the cr that the step hasn't added like namespace
func (m *Model) ApplyModel(k *k8sclient.K8sClient) error {

// retrieve current model and apply in k8s
err := k.ApplyModel(m.model)

if err != nil {
if err := k.ApplyModel(m.model); err != nil {
return err
}

Expand All @@ -194,7 +187,7 @@ func (m *Model) ModelReady(w k8sclient.WatcherStorage) error {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

// m.world.CurrentModel.model is assumed to be *mlopsv1alpha1.Model
// m.world.currentModel.model is assumed to be *mlopsv1alpha1.Model
// which implements runtime.Object, so no cast needed.
return w.WaitFor(
ctx,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/godog/steps/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"strings"
)

// RandomSuffix returns a lowercase, short, k8s-safe random string.
func randomSuffix(n int) string {
// randomString returns a lowercase, short, k8s-safe random string.
func randomString(n int) string {
b := make([]byte, n)
_, err := rand.Read(b)
if err != nil {
Expand Down
46 changes: 13 additions & 33 deletions tests/integration/godog/steps/world.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,34 @@ the Change License after the Change Date as each is defined in accordance with t
package steps

import (
"crypto/tls"
"fmt"
"net/http"

"github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane"
v "github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned"
"github.com/seldonio/seldon-core/tests/integration/godog/k8sclient"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
log "github.com/sirupsen/logrus"
)

type World struct {
namespace string
KubeClient *k8sclient.K8sClient
k8sClient v.Interface
WatcherStorage k8sclient.WatcherStorage
kubeClient *k8sclient.K8sClient
corek8sClient v.Interface
watcherStorage k8sclient.WatcherStorage
StartingClusterState string //todo: this will be a combination of starting state awareness of core 2 such as the
//todo: server config,seldon config and seldon runtime to be able to reconcile to starting state should we change
//todo: the state such as reducing replicas to 0 of scheduler to test unavailability
CurrentModel *Model
currentModel *Model
infer inference
logger *logrus.Entry
logger log.FieldLogger
Label map[string]string
}

type Config struct {
Namespace string
Logger *logrus.Entry
Logger log.FieldLogger
KubeClient *k8sclient.K8sClient
WatcherStorage k8sclient.WatcherStorage
GRPC v2_dataplane.GRPCInferenceServiceClient
IngressHost string
HTTPPort uint
GRPCPort uint
Expand All @@ -59,36 +55,20 @@ type inference struct {
}

func NewWorld(c Config) (*World, error) {
creds := insecure.NewCredentials()
if c.SSL {
creds = credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
})
}

opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}

conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", c.IngressHost, c.GRPCPort), opts...)
if err != nil {
return nil, fmt.Errorf("could not create grpc client: %w", err)
}
grpcClient := v2_dataplane.NewGRPCInferenceServiceClient(conn)

label := map[string]string{
"scenario": randomSuffix(6),
"scenario": randomString(6),
}

w := &World{
namespace: c.Namespace,
KubeClient: c.KubeClient,
WatcherStorage: c.WatcherStorage,
kubeClient: c.KubeClient,
watcherStorage: c.WatcherStorage,
currentModel: NewModel(),
infer: inference{
host: c.IngressHost,
http: &http.Client{},
grpc: c.GRPC,
httpPort: c.HTTPPort,
grpc: grpcClient,
ssl: c.SSL},
Label: label,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Use of this software is governed BY
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/

package scenario
package suite

import (
"encoding/json"
Expand Down
Loading
Loading