Skip to content

Commit 0a67eac

Browse files
committed
Fix router controllers to handle delete tombstones correctly
Router controllers were using cache.MetaNamespaceKeyFunc which fails on cache.DeletedFinalStateUnknown (tombstone) objects delivered during informer watch reconnection. This caused delete events to be silently dropped, leaving stale routes in the datastore. Changed to cache.DeletionHandlingMetaNamespaceKeyFunc in: - ModelRouteController.enqueueModelRoute - ModelServerController.enqueueModelServer - ModelServerController.enqueuePod - GatewayController.enqueueGateway - HTTPRouteController.enqueueHTTPRoute - InferencePoolController.enqueueInferencePool Added table-driven tests covering both normal objects and tombstones. Signed-off-by: WHOIM1205 <[email protected]>
1 parent a8c9193 commit 0a67eac

10 files changed

+475
-6
lines changed

pkg/kthena-router/controller/gateway_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (c *GatewayController) syncHandler(key string) error {
164164
}
165165

166166
func (c *GatewayController) enqueueGateway(obj interface{}) {
167-
key, err := cache.MetaNamespaceKeyFunc(obj)
167+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
168168
if err != nil {
169169
utilruntime.HandleError(err)
170170
return
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"testing"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/client-go/tools/cache"
24+
"k8s.io/client-go/util/workqueue"
25+
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
26+
27+
"github.com/volcano-sh/kthena/pkg/kthena-router/datastore"
28+
)
29+
30+
func TestEnqueueGateway(t *testing.T) {
31+
tests := []struct {
32+
name string
33+
obj interface{}
34+
expectedKey string
35+
}{
36+
{
37+
name: "normal Gateway object",
38+
obj: &gatewayv1.Gateway{
39+
ObjectMeta: metav1.ObjectMeta{
40+
Name: "test-gateway",
41+
Namespace: "default",
42+
},
43+
},
44+
expectedKey: "default/test-gateway",
45+
},
46+
{
47+
name: "tombstone with DeletedFinalStateUnknown",
48+
obj: cache.DeletedFinalStateUnknown{
49+
Key: "default/deleted-gateway",
50+
Obj: &gatewayv1.Gateway{
51+
ObjectMeta: metav1.ObjectMeta{
52+
Name: "deleted-gateway",
53+
Namespace: "default",
54+
},
55+
},
56+
},
57+
expectedKey: "default/deleted-gateway",
58+
},
59+
}
60+
61+
for _, tt := range tests {
62+
t.Run(tt.name, func(t *testing.T) {
63+
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
64+
defer queue.ShutDown()
65+
66+
c := &GatewayController{
67+
workqueue: queue,
68+
store: datastore.New(),
69+
}
70+
71+
c.enqueueGateway(tt.obj)
72+
73+
if queue.Len() != 1 {
74+
t.Fatalf("expected 1 item in queue, got %d", queue.Len())
75+
}
76+
77+
item, shutdown := queue.Get()
78+
if shutdown {
79+
t.Fatal("unexpected queue shutdown")
80+
}
81+
if item != tt.expectedKey {
82+
t.Errorf("expected key %q, got %q", tt.expectedKey, item)
83+
}
84+
})
85+
}
86+
}

pkg/kthena-router/controller/httproute_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func (c *HTTPRouteController) syncHandler(key string) error {
169169
}
170170

171171
func (c *HTTPRouteController) enqueueHTTPRoute(obj interface{}) {
172-
key, err := cache.MetaNamespaceKeyFunc(obj)
172+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
173173
if err != nil {
174174
utilruntime.HandleError(err)
175175
return
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"testing"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/client-go/tools/cache"
24+
"k8s.io/client-go/util/workqueue"
25+
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
26+
27+
"github.com/volcano-sh/kthena/pkg/kthena-router/datastore"
28+
)
29+
30+
func TestEnqueueHTTPRoute(t *testing.T) {
31+
tests := []struct {
32+
name string
33+
obj interface{}
34+
expectedKey string
35+
}{
36+
{
37+
name: "normal HTTPRoute object",
38+
obj: &gatewayv1.HTTPRoute{
39+
ObjectMeta: metav1.ObjectMeta{
40+
Name: "test-httproute",
41+
Namespace: "default",
42+
},
43+
},
44+
expectedKey: "default/test-httproute",
45+
},
46+
{
47+
name: "tombstone with DeletedFinalStateUnknown",
48+
obj: cache.DeletedFinalStateUnknown{
49+
Key: "default/deleted-httproute",
50+
Obj: &gatewayv1.HTTPRoute{
51+
ObjectMeta: metav1.ObjectMeta{
52+
Name: "deleted-httproute",
53+
Namespace: "default",
54+
},
55+
},
56+
},
57+
expectedKey: "default/deleted-httproute",
58+
},
59+
}
60+
61+
for _, tt := range tests {
62+
t.Run(tt.name, func(t *testing.T) {
63+
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
64+
defer queue.ShutDown()
65+
66+
c := &HTTPRouteController{
67+
workqueue: queue,
68+
store: datastore.New(),
69+
}
70+
71+
c.enqueueHTTPRoute(tt.obj)
72+
73+
if queue.Len() != 1 {
74+
t.Fatalf("expected 1 item in queue, got %d", queue.Len())
75+
}
76+
77+
item, shutdown := queue.Get()
78+
if shutdown {
79+
t.Fatal("unexpected queue shutdown")
80+
}
81+
if item != tt.expectedKey {
82+
t.Errorf("expected key %q, got %q", tt.expectedKey, item)
83+
}
84+
})
85+
}
86+
}

pkg/kthena-router/controller/inferencepool_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (c *InferencePoolController) syncHandler(key string) error {
155155
}
156156

157157
func (c *InferencePoolController) enqueueInferencePool(obj interface{}) {
158-
key, err := cache.MetaNamespaceKeyFunc(obj)
158+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
159159
if err != nil {
160160
utilruntime.HandleError(err)
161161
return
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"testing"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/client-go/tools/cache"
24+
"k8s.io/client-go/util/workqueue"
25+
inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
26+
27+
"github.com/volcano-sh/kthena/pkg/kthena-router/datastore"
28+
)
29+
30+
func TestEnqueueInferencePool(t *testing.T) {
31+
tests := []struct {
32+
name string
33+
obj interface{}
34+
expectedKey string
35+
}{
36+
{
37+
name: "normal InferencePool object",
38+
obj: &inferencev1.InferencePool{
39+
ObjectMeta: metav1.ObjectMeta{
40+
Name: "test-pool",
41+
Namespace: "default",
42+
},
43+
},
44+
expectedKey: "default/test-pool",
45+
},
46+
{
47+
name: "tombstone with DeletedFinalStateUnknown",
48+
obj: cache.DeletedFinalStateUnknown{
49+
Key: "default/deleted-pool",
50+
Obj: &inferencev1.InferencePool{
51+
ObjectMeta: metav1.ObjectMeta{
52+
Name: "deleted-pool",
53+
Namespace: "default",
54+
},
55+
},
56+
},
57+
expectedKey: "default/deleted-pool",
58+
},
59+
}
60+
61+
for _, tt := range tests {
62+
t.Run(tt.name, func(t *testing.T) {
63+
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
64+
defer queue.ShutDown()
65+
66+
c := &InferencePoolController{
67+
workqueue: queue,
68+
store: datastore.New(),
69+
}
70+
71+
c.enqueueInferencePool(tt.obj)
72+
73+
if queue.Len() != 1 {
74+
t.Fatalf("expected 1 item in queue, got %d", queue.Len())
75+
}
76+
77+
item, shutdown := queue.Get()
78+
if shutdown {
79+
t.Fatal("unexpected queue shutdown")
80+
}
81+
if item != tt.expectedKey {
82+
t.Errorf("expected key %q, got %q", tt.expectedKey, item)
83+
}
84+
})
85+
}
86+
}

pkg/kthena-router/controller/modelroute_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (c *ModelRouteController) syncHandler(key string) error {
151151
}
152152

153153
func (c *ModelRouteController) enqueueModelRoute(obj interface{}) {
154-
key, err := cache.MetaNamespaceKeyFunc(obj)
154+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
155155
if err != nil {
156156
utilruntime.HandleError(err)
157157
return
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"testing"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/client-go/tools/cache"
24+
"k8s.io/client-go/util/workqueue"
25+
26+
aiv1alpha1 "github.com/volcano-sh/kthena/pkg/apis/networking/v1alpha1"
27+
"github.com/volcano-sh/kthena/pkg/kthena-router/datastore"
28+
)
29+
30+
func TestEnqueueModelRoute(t *testing.T) {
31+
tests := []struct {
32+
name string
33+
obj interface{}
34+
expectedKey string
35+
}{
36+
{
37+
name: "normal ModelRoute object",
38+
obj: &aiv1alpha1.ModelRoute{
39+
ObjectMeta: metav1.ObjectMeta{
40+
Name: "test-route",
41+
Namespace: "default",
42+
},
43+
},
44+
expectedKey: "default/test-route",
45+
},
46+
{
47+
name: "tombstone with DeletedFinalStateUnknown",
48+
obj: cache.DeletedFinalStateUnknown{
49+
Key: "default/deleted-route",
50+
Obj: &aiv1alpha1.ModelRoute{
51+
ObjectMeta: metav1.ObjectMeta{
52+
Name: "deleted-route",
53+
Namespace: "default",
54+
},
55+
},
56+
},
57+
expectedKey: "default/deleted-route",
58+
},
59+
}
60+
61+
for _, tt := range tests {
62+
t.Run(tt.name, func(t *testing.T) {
63+
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
64+
defer queue.ShutDown()
65+
66+
c := &ModelRouteController{
67+
workqueue: queue,
68+
store: datastore.New(),
69+
}
70+
71+
c.enqueueModelRoute(tt.obj)
72+
73+
if queue.Len() != 1 {
74+
t.Fatalf("expected 1 item in queue, got %d", queue.Len())
75+
}
76+
77+
item, shutdown := queue.Get()
78+
if shutdown {
79+
t.Fatal("unexpected queue shutdown")
80+
}
81+
if item != tt.expectedKey {
82+
t.Errorf("expected key %q, got %q", tt.expectedKey, item)
83+
}
84+
})
85+
}
86+
}

0 commit comments

Comments
 (0)