@@ -79,7 +79,7 @@ func (c *baseClient) connect(ctx context.Context) error {
7979 // TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
8080 // errors
8181 if err != nil {
82- // failed to reconnect, try agin
82+ // failed to reconnect, try again
8383 runtime .HandleErrorWithContext (ctx , err , "the cloudevents client reconnect failed" )
8484 <- wait .RealTimer (DelayFn ()).C ()
8585 continue
@@ -89,14 +89,11 @@ func (c *baseClient) connect(ctx context.Context) error {
8989 metrics .IncreaseClientReconnectedCounter (c .clientID )
9090 c .setClientReady (true )
9191 c .sendReceiverSignal (restartReceiverSignal )
92- c .sendReconnectedSignal ()
9392 }
9493
9594 select {
9695 case <- ctx .Done ():
97- if c .receiverChan != nil {
98- close (c .receiverChan )
99- }
96+ c .closeChannels ()
10097 return
10198 case err , ok := <- c .transport .ErrorChan ():
10299 if ! ok {
@@ -164,22 +161,42 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
164161 return
165162 }
166163
164+ // send subscription request before starting to receive events
165+ if err := c .transport .Subscribe (ctx ); err != nil {
166+ runtime .HandleErrorWithContext (ctx , err , "failed to subscribe" )
167+ return
168+ }
169+
167170 c .receiverChan = make (chan int )
168171
169172 // start a go routine to handle cloudevents subscription
170173 go func () {
171- receiverCtx , receiverCancel := context .WithCancel (context . TODO () )
174+ receiverCtx , receiverCancel := context .WithCancel (ctx )
172175 startReceiving := true
176+ subscribed := true
173177
174178 for {
179+ if ! subscribed {
180+ // resubscribe before restarting the receiver
181+ if err := c .transport .Subscribe (ctx ); err != nil {
182+ runtime .HandleError (fmt .Errorf ("failed to resubscribe, %v" , err ))
183+ <- wait .RealTimer (DelayFn ()).C ()
184+ continue
185+ }
186+ subscribed = true
187+ // notify the client caller to resync the resources
188+ c .sendReconnectedSignal (ctx )
189+ }
190+
175191 if startReceiving {
176192 go func () {
177- if err := c .transport .Receive (receiverCtx , func (evt cloudevents.Event ) {
193+ if err := c .transport .Receive (receiverCtx , func (ctx context.Context , evt cloudevents.Event ) {
194+ logger := klog .FromContext (ctx )
178195 logger .V (2 ).Info ("Received event" , "event" , evt .Context )
179196 if logger .V (5 ).Enabled () {
180197 logger .V (5 ).Info ("Received event" , "event" , evt .String ())
181198 }
182- receive (receiverCtx , evt )
199+ receive (ctx , evt )
183200 }); err != nil {
184201 runtime .HandleError (fmt .Errorf ("failed to receive cloudevents, %v" , err ))
185202 }
@@ -191,7 +208,7 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
191208 case <- ctx .Done ():
192209 receiverCancel ()
193210 return
194- case signal , ok := <- c .receiverChan :
211+ case signal , ok := <- c .getReceiverChan () :
195212 if ! ok {
196213 // receiver channel is closed, stop the receiver
197214 receiverCancel ()
@@ -202,8 +219,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
202219 case restartReceiverSignal :
203220 logger .V (2 ).Info ("restart the cloudevents receiver" )
204221 // rebuild the receiver context and restart receiving
205- receiverCtx , receiverCancel = context .WithCancel (context . TODO () )
222+ receiverCtx , receiverCancel = context .WithCancel (ctx )
206223 startReceiving = true
224+ subscribed = false
207225 case stopReceiverSignal :
208226 logger .V (2 ).Info ("stop the cloudevents receiver" )
209227 receiverCancel ()
@@ -224,10 +242,32 @@ func (c *baseClient) sendReceiverSignal(signal int) {
224242 }
225243}
226244
227- func (c * baseClient ) sendReconnectedSignal () {
245+ func (c * baseClient ) closeChannels () {
246+ c .Lock ()
247+ defer c .Unlock ()
248+
249+ if c .receiverChan != nil {
250+ close (c .receiverChan )
251+ c .receiverChan = nil
252+ }
253+ if c .reconnectedChan != nil {
254+ close (c .reconnectedChan )
255+ c .reconnectedChan = nil
256+ }
257+ }
258+
259+ func (c * baseClient ) sendReconnectedSignal (ctx context.Context ) {
228260 c .RLock ()
229261 defer c .RUnlock ()
230- c .reconnectedChan <- struct {}{}
262+ if c .reconnectedChan != nil {
263+ select {
264+ case c .reconnectedChan <- struct {}{}:
265+ // Signal sent successfully
266+ default :
267+ // No receiver listening on reconnectedChan, that's okay - don't block
268+ klog .FromContext (ctx ).Info ("reconnected signal not sent, no receiver listening" )
269+ }
270+ }
231271}
232272
233273func (c * baseClient ) isClientReady () bool {
@@ -241,3 +281,9 @@ func (c *baseClient) setClientReady(ready bool) {
241281 defer c .Unlock ()
242282 c .clientReady = ready
243283}
284+
285+ func (c * baseClient ) getReceiverChan () chan int {
286+ c .RLock ()
287+ defer c .RUnlock ()
288+ return c .receiverChan
289+ }
0 commit comments