diff --git a/pkg/kthena-router/controller/gateway_controller.go b/pkg/kthena-router/controller/gateway_controller.go index ba33ff722..902053a0d 100644 --- a/pkg/kthena-router/controller/gateway_controller.go +++ b/pkg/kthena-router/controller/gateway_controller.go @@ -164,7 +164,7 @@ func (c *GatewayController) syncHandler(key string) error { } func (c *GatewayController) enqueueGateway(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) return diff --git a/pkg/kthena-router/controller/gateway_controller_test.go b/pkg/kthena-router/controller/gateway_controller_test.go new file mode 100644 index 000000000..d8132be0b --- /dev/null +++ b/pkg/kthena-router/controller/gateway_controller_test.go @@ -0,0 +1,86 @@ +/* +Copyright The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/volcano-sh/kthena/pkg/kthena-router/datastore" +) + +func TestEnqueueGateway(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedKey string + }{ + { + name: "normal Gateway object", + obj: &gatewayv1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gateway", + Namespace: "default", + }, + }, + expectedKey: "default/test-gateway", + }, + { + name: "tombstone with DeletedFinalStateUnknown", + obj: cache.DeletedFinalStateUnknown{ + Key: "default/deleted-gateway", + Obj: &gatewayv1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-gateway", + Namespace: "default", + }, + }, + }, + expectedKey: "default/deleted-gateway", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + defer queue.ShutDown() + + c := &GatewayController{ + workqueue: queue, + store: datastore.New(), + } + + c.enqueueGateway(tt.obj) + + if queue.Len() != 1 { + t.Fatalf("expected 1 item in queue, got %d", queue.Len()) + } + + item, shutdown := queue.Get() + if shutdown { + t.Fatal("unexpected queue shutdown") + } + if item != tt.expectedKey { + t.Errorf("expected key %q, got %q", tt.expectedKey, item) + } + }) + } +} diff --git a/pkg/kthena-router/controller/httproute_controller.go b/pkg/kthena-router/controller/httproute_controller.go index f57e76506..6bbde2f79 100644 --- a/pkg/kthena-router/controller/httproute_controller.go +++ b/pkg/kthena-router/controller/httproute_controller.go @@ -169,7 +169,7 @@ func (c *HTTPRouteController) syncHandler(key string) error { } func (c *HTTPRouteController) enqueueHTTPRoute(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) return diff --git a/pkg/kthena-router/controller/httproute_controller_test.go b/pkg/kthena-router/controller/httproute_controller_test.go new file mode 100644 index 000000000..93c61fc7e --- /dev/null +++ b/pkg/kthena-router/controller/httproute_controller_test.go @@ -0,0 +1,86 @@ +/* +Copyright The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/volcano-sh/kthena/pkg/kthena-router/datastore" +) + +func TestEnqueueHTTPRoute(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedKey string + }{ + { + name: "normal HTTPRoute object", + obj: &gatewayv1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-httproute", + Namespace: "default", + }, + }, + expectedKey: "default/test-httproute", + }, + { + name: "tombstone with DeletedFinalStateUnknown", + obj: cache.DeletedFinalStateUnknown{ + Key: "default/deleted-httproute", + Obj: &gatewayv1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-httproute", + Namespace: "default", + }, + }, + }, + expectedKey: "default/deleted-httproute", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + defer queue.ShutDown() + + c := &HTTPRouteController{ + workqueue: queue, + store: datastore.New(), + } + + c.enqueueHTTPRoute(tt.obj) + + if queue.Len() != 1 { + t.Fatalf("expected 1 item in queue, got %d", queue.Len()) + } + + item, shutdown := queue.Get() + if shutdown { + t.Fatal("unexpected queue shutdown") + } + if item != tt.expectedKey { + t.Errorf("expected key %q, got %q", tt.expectedKey, item) + } + }) + } +} diff --git a/pkg/kthena-router/controller/inferencepool_controller.go b/pkg/kthena-router/controller/inferencepool_controller.go index 9cfad955f..fd4da4572 100644 --- a/pkg/kthena-router/controller/inferencepool_controller.go +++ b/pkg/kthena-router/controller/inferencepool_controller.go @@ -155,7 +155,7 @@ func (c *InferencePoolController) syncHandler(key string) error { } func (c *InferencePoolController) enqueueInferencePool(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) return diff --git a/pkg/kthena-router/controller/inferencepool_controller_test.go b/pkg/kthena-router/controller/inferencepool_controller_test.go new file mode 100644 index 000000000..864efee77 --- /dev/null +++ b/pkg/kthena-router/controller/inferencepool_controller_test.go @@ -0,0 +1,86 @@ +/* +Copyright The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + + "github.com/volcano-sh/kthena/pkg/kthena-router/datastore" +) + +func TestEnqueueInferencePool(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedKey string + }{ + { + name: "normal InferencePool object", + obj: &inferencev1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + }, + expectedKey: "default/test-pool", + }, + { + name: "tombstone with DeletedFinalStateUnknown", + obj: cache.DeletedFinalStateUnknown{ + Key: "default/deleted-pool", + Obj: &inferencev1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-pool", + Namespace: "default", + }, + }, + }, + expectedKey: "default/deleted-pool", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + defer queue.ShutDown() + + c := &InferencePoolController{ + workqueue: queue, + store: datastore.New(), + } + + c.enqueueInferencePool(tt.obj) + + if queue.Len() != 1 { + t.Fatalf("expected 1 item in queue, got %d", queue.Len()) + } + + item, shutdown := queue.Get() + if shutdown { + t.Fatal("unexpected queue shutdown") + } + if item != tt.expectedKey { + t.Errorf("expected key %q, got %q", tt.expectedKey, item) + } + }) + } +} diff --git a/pkg/kthena-router/controller/modelroute_controller.go b/pkg/kthena-router/controller/modelroute_controller.go index 37f3cda10..6f83e72cd 100644 --- a/pkg/kthena-router/controller/modelroute_controller.go +++ b/pkg/kthena-router/controller/modelroute_controller.go @@ -151,7 +151,7 @@ func (c *ModelRouteController) syncHandler(key string) error { } func (c *ModelRouteController) enqueueModelRoute(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) return diff --git a/pkg/kthena-router/controller/modelroute_controller_test.go b/pkg/kthena-router/controller/modelroute_controller_test.go new file mode 100644 index 000000000..ee07c2cca --- /dev/null +++ b/pkg/kthena-router/controller/modelroute_controller_test.go @@ -0,0 +1,86 @@ +/* +Copyright The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + aiv1alpha1 "github.com/volcano-sh/kthena/pkg/apis/networking/v1alpha1" + "github.com/volcano-sh/kthena/pkg/kthena-router/datastore" +) + +func TestEnqueueModelRoute(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedKey string + }{ + { + name: "normal ModelRoute object", + obj: &aiv1alpha1.ModelRoute{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-route", + Namespace: "default", + }, + }, + expectedKey: "default/test-route", + }, + { + name: "tombstone with DeletedFinalStateUnknown", + obj: cache.DeletedFinalStateUnknown{ + Key: "default/deleted-route", + Obj: &aiv1alpha1.ModelRoute{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-route", + Namespace: "default", + }, + }, + }, + expectedKey: "default/deleted-route", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + defer queue.ShutDown() + + c := &ModelRouteController{ + workqueue: queue, + store: datastore.New(), + } + + c.enqueueModelRoute(tt.obj) + + if queue.Len() != 1 { + t.Fatalf("expected 1 item in queue, got %d", queue.Len()) + } + + item, shutdown := queue.Get() + if shutdown { + t.Fatal("unexpected queue shutdown") + } + if item != tt.expectedKey { + t.Errorf("expected key %q, got %q", tt.expectedKey, item) + } + }) + } +} diff --git a/pkg/kthena-router/controller/modelserver_controller.go b/pkg/kthena-router/controller/modelserver_controller.go index a0e8901f4..647b9b2ab 100644 --- a/pkg/kthena-router/controller/modelserver_controller.go +++ b/pkg/kthena-router/controller/modelserver_controller.go @@ -302,7 +302,7 @@ func (c *ModelServerController) addOrUpdatePod(pod *corev1.Pod) error { func (c *ModelServerController) enqueueModelServer(obj interface{}) { var key string var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return } @@ -315,7 +315,7 @@ func (c *ModelServerController) enqueueModelServer(obj interface{}) { func (c *ModelServerController) enqueuePod(obj interface{}) { var key string var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return } diff --git a/pkg/kthena-router/controller/modelserver_controller_test.go b/pkg/kthena-router/controller/modelserver_controller_test.go index 99bbf8c61..431384544 100644 --- a/pkg/kthena-router/controller/modelserver_controller_test.go +++ b/pkg/kthena-router/controller/modelserver_controller_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/informers" kubefake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" kthenafake "github.com/volcano-sh/kthena/client-go/clientset/versioned/fake" informersv1alpha1 "github.com/volcano-sh/kthena/client-go/informers/externalversions" @@ -1137,3 +1138,127 @@ func setupMockBackend() *gomonkey.Patches { }) return patch } + +// TestEnqueueModelServer tests tombstone handling in enqueueModelServer +func TestEnqueueModelServer(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedKey string + }{ + { + name: "normal ModelServer object", + obj: &aiv1alpha1.ModelServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-server", + Namespace: "default", + }, + }, + expectedKey: "default/test-server", + }, + { + name: "tombstone with DeletedFinalStateUnknown", + obj: cache.DeletedFinalStateUnknown{ + Key: "default/deleted-server", + Obj: &aiv1alpha1.ModelServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-server", + Namespace: "default", + }, + }, + }, + expectedKey: "default/deleted-server", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[QueueItem]()) + defer queue.ShutDown() + + c := &ModelServerController{ + workqueue: queue, + store: datastore.New(), + } + + c.enqueueModelServer(tt.obj) + + if queue.Len() != 1 { + t.Fatalf("expected 1 item in queue, got %d", queue.Len()) + } + + item, shutdown := queue.Get() + if shutdown { + t.Fatal("unexpected queue shutdown") + } + if item.Key != tt.expectedKey { + t.Errorf("expected key %q, got %q", tt.expectedKey, item.Key) + } + if item.ResourceType != ResourceTypeModelServer { + t.Errorf("expected ResourceType %q, got %q", ResourceTypeModelServer, item.ResourceType) + } + }) + } +} + +// TestEnqueuePod tests tombstone handling in enqueuePod +func TestEnqueuePod(t *testing.T) { + tests := []struct { + name string + obj interface{} + expectedKey string + }{ + { + name: "normal Pod object", + obj: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + }, + }, + expectedKey: "default/test-pod", + }, + { + name: "tombstone with DeletedFinalStateUnknown", + obj: cache.DeletedFinalStateUnknown{ + Key: "default/deleted-pod", + Obj: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleted-pod", + Namespace: "default", + }, + }, + }, + expectedKey: "default/deleted-pod", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[QueueItem]()) + defer queue.ShutDown() + + c := &ModelServerController{ + workqueue: queue, + store: datastore.New(), + } + + c.enqueuePod(tt.obj) + + if queue.Len() != 1 { + t.Fatalf("expected 1 item in queue, got %d", queue.Len()) + } + + item, shutdown := queue.Get() + if shutdown { + t.Fatal("unexpected queue shutdown") + } + if item.Key != tt.expectedKey { + t.Errorf("expected key %q, got %q", tt.expectedKey, item.Key) + } + if item.ResourceType != ResourceTypePod { + t.Errorf("expected ResourceType %q, got %q", ResourceTypePod, item.ResourceType) + } + }) + } +}