Skip to content

Commit 3c9e154

Browse files
authored
Merge pull request #8851 from killianmuldoon/pr-in-memory-watch
🌱 Add watch to in-memory server multiplexer
2 parents 869676f + 69a2ced commit 3c9e154

File tree

10 files changed

+335
-12
lines changed

10 files changed

+335
-12
lines changed

test/infrastructure/inmemory/internal/cloud/runtime/cache/cache.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type Cache interface {
5656
// Informer forwards events to event handlers.
5757
type Informer interface {
5858
AddEventHandler(handler InformEventHandler) error
59+
RemoveEventHandler(handler InformEventHandler) error
5960
}
6061

6162
// InformEventHandler handle events originated by a source.

test/infrastructure/inmemory/internal/cloud/runtime/cache/client.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,11 +409,8 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
409409
delete(tracker.ownedObjects, ownReference{gvk: objGVK, key: objKey})
410410
}
411411

412-
// If the object still has finalizers, only set the deletion timestamp if not already set.
413-
if len(obj.GetFinalizers()) > 0 {
414-
if !obj.GetDeletionTimestamp().IsZero() {
415-
return false, nil
416-
}
412+
// Set the deletion timestamp if not already set.
413+
if obj.GetDeletionTimestamp().IsZero() {
417414
if err := c.beforeDelete(resourceGroup, obj); err != nil {
418415
return false, apierrors.NewBadRequest(err.Error())
419416
}
@@ -424,13 +421,18 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
424421
if err := c.beforeUpdate(resourceGroup, oldObj, obj); err != nil {
425422
return false, apierrors.NewBadRequest(err.Error())
426423
}
424+
425+
// TODO: (killianmuldoon) Understand if setting this twice is necessary.
427426
// Required to override default beforeUpdate behaviour
428427
// that prevent changes to automatically managed fields.
429428
obj.SetDeletionTimestamp(&now)
430429

431430
objects[objKey] = obj
432431
c.afterUpdate(resourceGroup, oldObj, obj)
432+
}
433433

434+
// If the object still has finalizers return early.
435+
if len(obj.GetFinalizers()) > 0 {
434436
return false, nil
435437
}
436438

test/infrastructure/inmemory/internal/cloud/runtime/cache/client_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ func Test_cache_client(t *testing.T) {
629629
g.Expect(c.resourceGroups["foo"].objects).To(HaveKey(cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)), "gvk must exist in object tracker for foo")
630630
g.Expect(c.resourceGroups["foo"].objects[cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)]).ToNot(HaveKey(types.NamespacedName{Name: "bar"}), "Object bar must not exist in object tracker for foo")
631631

632-
g.Expect(h.Events()).ToNot(ContainElement("foo, CloudMachine=bar, Deleted"))
632+
g.Expect(h.Events()).To(ContainElement("foo, CloudMachine=bar, Deleted"))
633633
})
634634

635635
t.Run("delete with finalizers", func(t *testing.T) {
@@ -760,6 +760,11 @@ func (i *fakeInformer) AddEventHandler(handler InformEventHandler) error {
760760
return nil
761761
}
762762

763+
func (i *fakeInformer) RemoveEventHandler(_ InformEventHandler) error {
764+
i.handler = nil
765+
return nil
766+
}
767+
763768
func (i *fakeInformer) InformCreate(resourceGroup string, obj client.Object) {
764769
i.handler.OnCreate(resourceGroup, obj)
765770
}

test/infrastructure/inmemory/internal/cloud/runtime/cache/informer.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,17 @@ func (i *informer) AddEventHandler(handler InformEventHandler) error {
3838
return nil
3939
}
4040

41+
func (i *informer) RemoveEventHandler(handler InformEventHandler) error {
42+
i.lock.Lock()
43+
defer i.lock.Unlock()
44+
for j, h := range i.handlers {
45+
if h == handler {
46+
i.handlers = append(i.handlers[:j], i.handlers[j+1:]...)
47+
}
48+
}
49+
return nil
50+
}
51+
4152
func (c *cache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
4253
gvk, err := apiutil.GVKForObject(obj, c.scheme)
4354
if err != nil {

test/infrastructure/inmemory/internal/cloud/runtime/controller/controller_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ func (i *fakeInformer) AddEventHandler(handler ccache.InformEventHandler) error
148148
return nil
149149
}
150150

151+
func (i *fakeInformer) RemoveEventHandler(_ ccache.InformEventHandler) error {
152+
i.handler = nil
153+
return nil
154+
}
155+
151156
func (i *fakeInformer) InformCreate(resourceGroup string, obj client.Object) {
152157
i.handler.OnCreate(resourceGroup, obj)
153158
}

test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func TestReconcileNormalEtcd(t *testing.T) {
318318
// NOTE: make sure to use ports different than other tests, so we can run tests in parallel
319319
MinPort: server.DefaultMinPort + 1000,
320320
MaxPort: server.DefaultMinPort + 1099,
321-
DebugPort: server.DefaultDebugPort,
321+
DebugPort: server.DefaultDebugPort + 10,
322322
})
323323
g.Expect(err).ToNot(HaveOccurred())
324324
_, err = wcmux.InitWorkloadClusterListener(klog.KObj(cluster).String())
@@ -445,8 +445,8 @@ func TestReconcileNormalApiServer(t *testing.T) {
445445
wcmux, err := server.NewWorkloadClustersMux(manager, host, server.CustomPorts{
446446
// NOTE: make sure to use ports different than other tests, so we can run tests in parallel
447447
MinPort: server.DefaultMinPort + 1100,
448-
MaxPort: server.DefaultMinPort + 1299,
449-
DebugPort: server.DefaultDebugPort,
448+
MaxPort: server.DefaultMinPort + 1199,
449+
DebugPort: server.DefaultDebugPort + 11,
450450
})
451451
g.Expect(err).ToNot(HaveOccurred())
452452
_, err = wcmux.InitWorkloadClusterListener(klog.KObj(cluster).String())

test/infrastructure/inmemory/internal/server/api/handler.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,16 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
230230
return
231231
}
232232

233+
// If the request is a Watch handle it using watchForResource.
234+
if isWatch(req) {
235+
err = h.watchForResource(req, resp, resourceGroup, *gvk)
236+
if err != nil {
237+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
238+
return
239+
}
240+
return
241+
}
242+
233243
// Reads and returns the requested data.
234244
list := &unstructured.UnstructuredList{}
235245
list.SetAPIVersion(gvk.GroupVersion().String())
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"net/http"
23+
"time"
24+
25+
"github.com/emicklei/go-restful/v3"
26+
"github.com/pkg/errors"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/apimachinery/pkg/watch"
30+
"sigs.k8s.io/controller-runtime/pkg/client"
31+
)
32+
33+
// Event records a lifecycle event for a Kubernetes object.
34+
type Event struct {
35+
Type watch.EventType `json:"type,omitempty"`
36+
Object runtime.Object `json:"object,omitempty"`
37+
}
38+
39+
// WatchEventDispatcher dispatches events for a single resourceGroup.
40+
type WatchEventDispatcher struct {
41+
resourceGroup string
42+
events chan *Event
43+
}
44+
45+
// OnCreate dispatches Create events.
46+
func (m *WatchEventDispatcher) OnCreate(resourceGroup string, o client.Object) {
47+
if resourceGroup != m.resourceGroup {
48+
return
49+
}
50+
m.events <- &Event{
51+
Type: watch.Added,
52+
Object: o,
53+
}
54+
}
55+
56+
// OnUpdate dispatches Update events.
57+
func (m *WatchEventDispatcher) OnUpdate(resourceGroup string, _, o client.Object) {
58+
if resourceGroup != m.resourceGroup {
59+
return
60+
}
61+
m.events <- &Event{
62+
Type: watch.Modified,
63+
Object: o,
64+
}
65+
}
66+
67+
// OnDelete dispatches Delete events.
68+
func (m *WatchEventDispatcher) OnDelete(resourceGroup string, o client.Object) {
69+
if resourceGroup != m.resourceGroup {
70+
return
71+
}
72+
m.events <- &Event{
73+
Type: watch.Deleted,
74+
Object: o,
75+
}
76+
}
77+
78+
// OnGeneric dispatches Generic events.
79+
func (m *WatchEventDispatcher) OnGeneric(resourceGroup string, o client.Object) {
80+
if resourceGroup != m.resourceGroup {
81+
return
82+
}
83+
m.events <- &Event{
84+
Type: "GENERIC",
85+
Object: o,
86+
}
87+
}
88+
89+
// isWatch is true if the request contains `watch="true"` as a query parameter.
90+
func isWatch(req *restful.Request) bool {
91+
return req.QueryParameter("watch") == "true"
92+
}
93+
94+
func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.Response, resourceGroup string, gvk schema.GroupVersionKind) (reterr error) {
95+
ctx := req.Request.Context()
96+
queryTimeout := req.QueryParameter("timeoutSeconds")
97+
c := h.manager.GetCache()
98+
i, err := c.GetInformerForKind(ctx, gvk)
99+
if err != nil {
100+
return err
101+
}
102+
h.log.Info(fmt.Sprintf("Serving Watch for %v", req.Request.URL))
103+
// With an unbuffered event channel RemoveEventHandler could be blocked because it requires a lock on the informer.
104+
// When Run stops reading from the channel the informer could be blocked with an unbuffered chanel and then RemoveEventHandler never goes through.
105+
events := make(chan *Event, 10)
106+
watcher := &WatchEventDispatcher{
107+
resourceGroup: resourceGroup,
108+
events: events,
109+
}
110+
111+
if err := i.AddEventHandler(watcher); err != nil {
112+
return err
113+
}
114+
115+
// Defer cleanup which removes the event handler and ensures the channel is empty of events.
116+
defer func() {
117+
reterr = i.RemoveEventHandler(watcher)
118+
// Doing this to ensure the channel is empty.
119+
L:
120+
for {
121+
select {
122+
case <-events:
123+
default:
124+
break L
125+
}
126+
}
127+
}()
128+
129+
if err = watcher.Run(ctx, queryTimeout, resp); err != nil {
130+
return err
131+
}
132+
return reterr
133+
}
134+
135+
// Run serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
136+
func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.ResponseWriter) error {
137+
flusher, ok := w.(http.Flusher)
138+
if !ok {
139+
return errors.New("can't start Watch: can't get http.Flusher")
140+
}
141+
resp, ok := w.(*restful.Response)
142+
if !ok {
143+
return errors.New("can't start Watch: can't get restful.Response")
144+
}
145+
w.Header().Set("Transfer-Encoding", "chunked")
146+
w.WriteHeader(http.StatusOK)
147+
flusher.Flush()
148+
149+
timeoutTimer, seconds, err := setTimer(timeout)
150+
if err != nil {
151+
return errors.Wrapf(err, "can't start Watch: could not set timeout")
152+
}
153+
154+
ctx, cancel := context.WithTimeout(ctx, seconds)
155+
defer cancel()
156+
defer timeoutTimer.Stop()
157+
for {
158+
select {
159+
case <-ctx.Done():
160+
return nil
161+
case <-timeoutTimer.C:
162+
return nil
163+
case event, ok := <-m.events:
164+
if !ok {
165+
// End of results.
166+
return nil
167+
}
168+
if err := resp.WriteEntity(event); err != nil {
169+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
170+
}
171+
if len(m.events) == 0 {
172+
flusher.Flush()
173+
}
174+
}
175+
}
176+
}
177+
178+
// setTimer creates a time.Timer with the passed `timeout` or a default timeout of 120 seconds if `timeout` is empty.
179+
func setTimer(timeout string) (*time.Timer, time.Duration, error) {
180+
var defaultTimeout = 120 * time.Second
181+
if timeout == "" {
182+
t := time.NewTimer(defaultTimeout)
183+
return t, defaultTimeout, nil
184+
}
185+
seconds, err := time.ParseDuration(fmt.Sprintf("%ss", timeout))
186+
if err != nil {
187+
return nil, 0, errors.Wrapf(err, "Could not parse request timeout %s", timeout)
188+
}
189+
t := time.NewTimer(seconds)
190+
return t, seconds, nil
191+
}

test/infrastructure/inmemory/internal/server/listener.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (s *WorkloadClusterListener) RESTConfig() (*rest.Config, error) {
114114
}
115115

116116
// GetClient returns a client for a WorkloadClusterListener.
117-
func (s *WorkloadClusterListener) GetClient() (client.Client, error) {
117+
func (s *WorkloadClusterListener) GetClient() (client.WithWatch, error) {
118118
restConfig, err := s.RESTConfig()
119119
if err != nil {
120120
return nil, err
@@ -130,7 +130,7 @@ func (s *WorkloadClusterListener) GetClient() (client.Client, error) {
130130
return nil, err
131131
}
132132

133-
c, err := client.New(restConfig, client.Options{Scheme: s.scheme, Mapper: mapper})
133+
c, err := client.NewWithWatch(restConfig, client.Options{Scheme: s.scheme, Mapper: mapper})
134134
if err != nil {
135135
return nil, err
136136
}

0 commit comments

Comments
 (0)