@@ -20,20 +20,21 @@ import (
2020 "context"
2121 "fmt"
2222 "net/http"
23+ "strconv"
2324 "time"
2425
2526 "github.com/emicklei/go-restful/v3"
2627 "github.com/pkg/errors"
27- "k8s.io/apimachinery/pkg/runtime"
2828 "k8s.io/apimachinery/pkg/runtime/schema"
2929 "k8s.io/apimachinery/pkg/watch"
30+ ctrl "sigs.k8s.io/controller-runtime"
3031 "sigs.k8s.io/controller-runtime/pkg/client"
3132)
3233
3334// Event records a lifecycle event for a Kubernetes object.
3435type Event struct {
3536 Type watch.EventType `json:"type,omitempty"`
36- Object runtime .Object `json:"object,omitempty"`
37+ Object client .Object `json:"object,omitempty"`
3738}
3839
3940// WatchEventDispatcher dispatches events for a single resourceGroup.
@@ -88,13 +89,15 @@ func (m *WatchEventDispatcher) OnGeneric(resourceGroup string, o client.Object)
8889
8990func (h * apiServerHandler ) watchForResource (req * restful.Request , resp * restful.Response , resourceGroup string , gvk schema.GroupVersionKind ) (reterr error ) {
9091 ctx := req .Request .Context ()
92+ log := h .log .WithValues ("resourceGroup" , resourceGroup , "gvk" , gvk .String ())
93+ ctx = ctrl .LoggerInto (ctx , log )
9194 queryTimeout := req .QueryParameter ("timeoutSeconds" )
95+ resourceVersion := req .QueryParameter ("resourceVersion" )
9296 c := h .manager .GetCache ()
9397 i , err := c .GetInformerForKind (ctx , gvk )
9498 if err != nil {
9599 return err
96100 }
97- h .log .Info (fmt .Sprintf ("Serving Watch for %v" , req .Request .URL ))
98101 // With an unbuffered event channel RemoveEventHandler could be blocked because it requires a lock on the informer.
99102 // When Run stops reading from the channel the informer could be blocked with an unbuffered chanel and then RemoveEventHandler never goes through.
100103 // 1000 is used to avoid deadlocks in clusters with a higher number of Machines/Nodes.
@@ -115,7 +118,12 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.
115118 L:
116119 for {
117120 select {
118- case <- events :
121+ case event , ok := <- events :
122+ if ! ok {
123+ // End of results.
124+ break L
125+ }
126+ log .V (4 ).Info ("Missed event" , "eventType" , event .Type , "objectName" , event .Object .GetName (), "resourceVersion" , event .Object .GetResourceVersion ())
119127 default :
120128 break L
121129 }
@@ -124,11 +132,49 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.
124132 // Note: After we removed the handler, no new events will be written to the events channel.
125133 }()
126134
127- return watcher .Run (ctx , queryTimeout , resp )
135+ // Get at client to the resource group and list all relevant objects.
136+ inmemoryClient := h .manager .GetResourceGroup (resourceGroup ).GetClient ()
137+ list , err := h .v1List (ctx , req , gvk , inmemoryClient )
138+ if err != nil {
139+ return err
140+ }
141+
142+ // If resourceVersion was set parse to uint64 which is the representation in the simulated apiserver.
143+ var parsedResourceVersion uint64
144+ if resourceVersion != "" {
145+ parsedResourceVersion , err = strconv .ParseUint (resourceVersion , 10 , 64 )
146+ if err != nil {
147+ return err
148+ }
149+ }
150+
151+ initialEvents := []Event {}
152+
153+ // Loop over all items and fill the list of events with objects which have a newer resourceVersion.
154+ for _ , obj := range list .Items {
155+ if resourceVersion != "" {
156+ objResourceVersion , err := strconv .ParseUint (obj .GetResourceVersion (), 10 , 64 )
157+ if err != nil {
158+ return err
159+ }
160+ if objResourceVersion <= parsedResourceVersion {
161+ continue
162+ }
163+ }
164+ eventType := watch .Modified
165+ // kube-apiserver emits all events as ADDED when no resourceVersion is given.
166+ if obj .GetGeneration () == 1 || resourceVersion == "" {
167+ eventType = watch .Added
168+ }
169+ initialEvents = append (initialEvents , Event {Type : eventType , Object : & obj })
170+ }
171+
172+ return watcher .Run (ctx , queryTimeout , initialEvents , list .GetResourceVersion (), resp )
128173}
129174
130175// Run serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
131- func (m * WatchEventDispatcher ) Run (ctx context.Context , timeout string , w http.ResponseWriter ) error {
176+ func (m * WatchEventDispatcher ) Run (ctx context.Context , timeout string , initialEvents []Event , initialResourceVersion string , w http.ResponseWriter ) error {
177+ log := ctrl .LoggerFrom (ctx )
132178 flusher , ok := w .(http.Flusher )
133179 if ! ok {
134180 return errors .New ("can't start Watch: can't get http.Flusher" )
@@ -139,6 +185,16 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
139185 }
140186 w .Header ().Set ("Transfer-Encoding" , "chunked" )
141187 w .WriteHeader (http .StatusOK )
188+
189+ // Write all initial events.
190+ for _ , event := range initialEvents {
191+ if err := resp .WriteEntity (event ); err != nil {
192+ log .Error (err , "Error writing initial event" , "eventType" , event .Type , "objectName" , event .Object .GetName (), "resourceVersion" , event .Object .GetResourceVersion ())
193+ _ = resp .WriteErrorString (http .StatusInternalServerError , err .Error ())
194+ } else {
195+ log .V (4 ).Info ("Wrote initial event" , "eventType" , event .Type , "objectName" , event .Object .GetName (), "resourceVersion" , event .Object .GetResourceVersion ())
196+ }
197+ }
142198 flusher .Flush ()
143199
144200 timeoutTimer , seconds , err := setTimer (timeout )
@@ -149,6 +205,15 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
149205 ctx , cancel := context .WithTimeout (ctx , seconds )
150206 defer cancel ()
151207 defer timeoutTimer .Stop ()
208+
209+ // Use the resourceVersion of the list to filter out events from the channel
210+ // which are already written above.
211+ minResourceVersion , err := strconv .ParseUint (initialResourceVersion , 10 , 64 )
212+ if err != nil {
213+ return err
214+ }
215+
216+ var objResourceVersion uint64
152217 for {
153218 select {
154219 case <- ctx .Done ():
@@ -160,8 +225,25 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
160225 // End of results.
161226 return nil
162227 }
228+
229+ // Parse and check if the object has a higher resource version than we allow.
230+ objResourceVersion , err = strconv .ParseUint (event .Object .GetResourceVersion (), 10 , 64 )
231+ if err != nil {
232+ log .Error (err , "Parsing object resource version" , "eventType" , event .Type , "objectName" , event .Object .GetName (), "resourceVersion" , event .Object .GetResourceVersion ())
233+ _ = resp .WriteErrorString (http .StatusInternalServerError , err .Error ())
234+ continue
235+ }
236+
237+ // Skip objects which were already written.
238+ if objResourceVersion <= minResourceVersion {
239+ continue
240+ }
241+
163242 if err := resp .WriteEntity (event ); err != nil {
243+ log .Error (err , "Error writing event" , "eventType" , event .Type , "objectName" , event .Object .GetName (), "resourceVersion" , event .Object .GetResourceVersion ())
164244 _ = resp .WriteErrorString (http .StatusInternalServerError , err .Error ())
245+ } else {
246+ log .V (4 ).Info ("Wrote event" , "eventType" , event .Type , "objectName" , event .Object .GetName (), "resourceVersion" , event .Object .GetResourceVersion ())
165247 }
166248 if len (m .events ) == 0 {
167249 flusher .Flush ()
0 commit comments