Skip to content

Commit 8fd9638

Browse files
committed
addressed comments:
* format issues * watcher store now saves keys with namespace/kind/name
1 parent 84ce397 commit 8fd9638

File tree

5 files changed

+55
-25
lines changed

5 files changed

+55
-25
lines changed

tests/integration/godog/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ require (
99
github.com/sirupsen/logrus v1.9.3
1010
github.com/spf13/pflag v1.0.7
1111
google.golang.org/grpc v1.73.0
12-
gopkg.in/yaml.v3 v3.0.1
13-
k8s.io/api v0.34.2
12+
k8s.io/apiextensions-apiserver v0.34.1
1413
k8s.io/apimachinery v0.34.2
1514
k8s.io/client-go v0.34.2
1615
sigs.k8s.io/controller-runtime v0.22.4
16+
sigs.k8s.io/yaml v1.6.0
1717
)
1818

1919
replace (
@@ -71,13 +71,13 @@ require (
7171
google.golang.org/protobuf v1.36.6 // indirect
7272
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
7373
gopkg.in/inf.v0 v0.9.1 // indirect
74-
k8s.io/apiextensions-apiserver v0.34.1 // indirect
74+
gopkg.in/yaml.v3 v3.0.1 // indirect
75+
k8s.io/api v0.34.2 // indirect
7576
k8s.io/klog/v2 v2.130.1 // indirect
7677
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
7778
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect
7879
knative.dev/pkg v0.0.0-20250128013458-efddeac3ec35 // indirect
7980
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
8081
sigs.k8s.io/randfill v1.0.0 // indirect
8182
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
82-
sigs.k8s.io/yaml v1.6.0 // indirect
8383
)

tests/integration/godog/k8sclient/watcher_store.go

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,40 @@ import (
1414
"fmt"
1515
"sync"
1616

17+
mlopsscheme "github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned/scheme"
1718
"github.com/seldonio/seldon-core/operator/v2/pkg/generated/clientset/versioned/typed/mlops/v1alpha1"
1819
log "github.com/sirupsen/logrus"
20+
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
1921
"k8s.io/apimachinery/pkg/api/meta"
2022
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2123
"k8s.io/apimachinery/pkg/runtime"
2224
"k8s.io/apimachinery/pkg/watch"
2325
)
2426

2527
type WatcherStorage interface {
26-
Put(runtime.Object)
27-
Get(runtime.Object) (runtime.Object, bool)
28-
WaitForObject(ctx context.Context, obj runtime.Object, cond ConditionFunc) error
29-
WaitForKey(ctx context.Context, key string, cond ConditionFunc) error
28+
WaitForObjectCondition(ctx context.Context, obj runtime.Object, cond ConditionFunc) error
29+
WaitForModelCondition(ctx context.Context, modelName string, cond ConditionFunc) error
30+
WaitForPipelineCondition(ctx context.Context, modelName string, cond ConditionFunc) error
3031
Clear()
3132
Start()
3233
Stop()
3334
}
3435

36+
type objectKind string
37+
38+
const (
39+
model objectKind = "Model"
40+
pipeline objectKind = "Pipeline"
41+
)
42+
3543
type WatcherStore struct {
3644
namespace string
3745
label string
3846
mlopsClient v1alpha1.MlopsV1alpha1Interface
3947
modelWatcher watch.Interface
4048
pipelineWatcher watch.Interface
4149
logger log.FieldLogger
50+
scheme *runtime.Scheme
4251

4352
mu sync.RWMutex
4453
store map[string]runtime.Object // key: "namespace/name"
@@ -71,6 +80,11 @@ func NewWatcherStore(namespace string, label string, mlopsClient v1alpha1.MlopsV
7180
return nil, fmt.Errorf("failed to create pipeline watcher: %w", err)
7281
}
7382

83+
// Base scheme + register your CRDs
84+
s := runtime.NewScheme()
85+
_ = scheme.AddToScheme(s) // core k8s types (optional but fine)
86+
_ = mlopsscheme.AddToScheme(s) // <-- this is the key line for your CRDs
87+
7488
return &WatcherStore{
7589
namespace: namespace,
7690
label: label,
@@ -80,6 +94,7 @@ func NewWatcherStore(namespace string, label string, mlopsClient v1alpha1.MlopsV
8094
logger: logger.WithField("client", "watcher_store"),
8195
store: make(map[string]runtime.Object),
8296
doneChan: make(chan struct{}),
97+
scheme: s,
8398
}, nil
8499
}
85100

@@ -107,7 +122,7 @@ func (s *WatcherStore) Start() {
107122

108123
switch event.Type {
109124
case watch.Added, watch.Modified:
110-
s.Put(event.Object)
125+
s.put(event.Object)
111126
case watch.Deleted:
112127
s.delete(event.Object)
113128
case watch.Error:
@@ -143,7 +158,7 @@ func (s *WatcherStore) Start() {
143158

144159
switch event.Type {
145160
case watch.Added, watch.Modified:
146-
s.Put(event.Object)
161+
s.put(event.Object)
147162
case watch.Deleted:
148163
s.delete(event.Object)
149164
case watch.Error:
@@ -177,14 +192,24 @@ func (s *WatcherStore) keyFor(obj runtime.Object) (string, error) {
177192

178193
ns := accessor.GetNamespace()
179194
if ns == "" {
180-
// fall back to store namespace if the object is cluster-scoped or unset
181-
ns = s.namespace
195+
ns = s.namespace // or "_cluster" if you prefer
196+
}
197+
198+
// Prefer scheme-based GVK for typed objects
199+
gvks, _, err := s.scheme.ObjectKinds(obj)
200+
if err != nil || len(gvks) == 0 {
201+
// fallback: TypeMeta if present
202+
if ta, taErr := meta.TypeAccessor(obj); taErr == nil && ta.GetKind() != "" {
203+
return fmt.Sprintf("%s/%s/%s", ns, ta.GetKind(), accessor.GetName()), nil
204+
}
205+
return "", fmt.Errorf("failed to determine kind for %T: %w", obj, err)
182206
}
183207

184-
return fmt.Sprintf("%s/%s", ns, accessor.GetName()), nil
208+
kind := gvks[0].Kind
209+
return fmt.Sprintf("%s/%s/%s", ns, kind, accessor.GetName()), nil
185210
}
186211

187-
func (s *WatcherStore) Put(obj runtime.Object) {
212+
func (s *WatcherStore) put(obj runtime.Object) {
188213
if obj == nil {
189214
return
190215
}
@@ -201,7 +226,7 @@ func (s *WatcherStore) Put(obj runtime.Object) {
201226
s.notifyWaiters(key, obj)
202227
}
203228

204-
func (s *WatcherStore) Get(obj runtime.Object) (runtime.Object, bool) {
229+
func (s *WatcherStore) get(obj runtime.Object) (runtime.Object, bool) {
205230
if obj == nil {
206231
return nil, false
207232
}
@@ -242,7 +267,7 @@ func (s *WatcherStore) delete(obj runtime.Object) {
242267
s.notifyWaiters(key, nil)
243268
}
244269

245-
func (s *WatcherStore) WaitForObject(ctx context.Context, obj runtime.Object, cond ConditionFunc) error {
270+
func (s *WatcherStore) WaitForObjectCondition(ctx context.Context, obj runtime.Object, cond ConditionFunc) error {
246271
key, err := s.keyFor(obj)
247272
if err != nil {
248273
return err
@@ -283,10 +308,17 @@ func (s *WatcherStore) WaitForObject(ctx context.Context, obj runtime.Object, co
283308
return err
284309
}
285310
}
311+
func (s *WatcherStore) WaitForModelCondition(ctx context.Context, modelName string, cond ConditionFunc) error {
312+
key := fmt.Sprintf("%s/%s/%s", s.namespace, model, modelName)
313+
return s.waitForKey(ctx, key, cond)
314+
}
315+
316+
func (s *WatcherStore) WaitForPipelineCondition(ctx context.Context, pipelineName string, cond ConditionFunc) error {
317+
key := fmt.Sprintf("%s/%s/%s", s.namespace, pipeline, pipelineName)
318+
return s.waitForKey(ctx, key, cond)
319+
}
286320

287-
func (s *WatcherStore) WaitForKey(ctx context.Context, key string, cond ConditionFunc) error {
288-
// build key
289-
key = fmt.Sprintf("%s/%s", s.namespace, key)
321+
func (s *WatcherStore) waitForKey(ctx context.Context, key string, cond ConditionFunc) error {
290322

291323
// Fast path: check current state
292324
s.mu.RLock()

tests/integration/godog/steps/custom_model_steps.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (m *Model) deleteModel(ctx context.Context, model string) error {
6868
}
6969

7070
func (m *Model) waitForModelNameReady(ctx context.Context, name string) error {
71-
return m.watcherStorage.WaitForKey(
71+
return m.watcherStorage.WaitForModelCondition(
7272
ctx,
7373
name,
7474
assertions.ModelReady)

tests/integration/godog/steps/model_steps.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (m *Model) ApplyModel(k *k8sclient.K8sClient) error {
268268
}
269269

270270
func (m *Model) waitForModelReady(ctx context.Context) error {
271-
return m.watcherStorage.WaitForObject(
271+
return m.watcherStorage.WaitForObjectCondition(
272272
ctx,
273273
m.model, // the k8s object being watched
274274
assertions.ModelReady, // predicate from steps/assertions

tests/integration/godog/steps/pipeline_steps.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,15 @@ func (p *Pipeline) applyScenarioLabel() {
8787
}
8888

8989
func (p *Pipeline) waitForPipelineNameReady(ctx context.Context, name string) error {
90-
91-
return p.watcherStorage.WaitForKey(
90+
return p.watcherStorage.WaitForPipelineCondition(
9291
ctx,
9392
name,
9493
assertions.PipelineReady,
9594
)
9695
}
9796

9897
func (p *Pipeline) waitForPipelineReady(ctx context.Context) error {
99-
100-
return p.watcherStorage.WaitForObject(
98+
return p.watcherStorage.WaitForObjectCondition(
10199
ctx,
102100
p.pipeline,
103101
assertions.PipelineReady,

0 commit comments

Comments
 (0)