@@ -79,7 +79,7 @@ func (c *baseClient) connect(ctx context.Context) error {
7979 // TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
8080 // errors
8181 if err != nil {
82- // failed to reconnect, try agin
82+ // failed to reconnect, try again
8383 runtime .HandleErrorWithContext (ctx , err , "the cloudevents client reconnect failed" )
8484 <- wait .RealTimer (DelayFn ()).C ()
8585 continue
@@ -89,14 +89,16 @@ func (c *baseClient) connect(ctx context.Context) error {
8989 metrics .IncreaseClientReconnectedCounter (c .clientID )
9090 c .setClientReady (true )
9191 c .sendReceiverSignal (restartReceiverSignal )
92- c .sendReconnectedSignal ()
9392 }
9493
9594 select {
9695 case <- ctx .Done ():
9796 if c .receiverChan != nil {
9897 close (c .receiverChan )
9998 }
99+ if c .reconnectedChan != nil {
100+ close (c .reconnectedChan )
101+ }
100102 return
101103 case err , ok := <- c .transport .ErrorChan ():
102104 if ! ok {
@@ -164,14 +166,32 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
164166 return
165167 }
166168
169+ // send subscription request before starting to receive events
170+ if err := c .transport .Subscribe (ctx ); err != nil {
171+ runtime .HandleErrorWithContext (ctx , err , "failed to subscribe" )
172+ return
173+ }
174+
167175 c .receiverChan = make (chan int )
168176
169177 // start a go routine to handle cloudevents subscription
170178 go func () {
171- receiverCtx , receiverCancel := context .WithCancel (context . TODO () )
179+ receiverCtx , receiverCancel := context .WithCancel (ctx )
172180 startReceiving := true
181+ subscribed := true
173182
174183 for {
184+ if ! subscribed {
185+ // resubscribe before restarting the receiver
186+ if err := c .transport .Subscribe (ctx ); err != nil {
187+ runtime .HandleError (fmt .Errorf ("failed to resubscribe, %v" , err ))
188+ continue
189+ }
190+ subscribed = true
191+ // notify the client caller to resync the resources
192+ c .sendReconnectedSignal (ctx )
193+ }
194+
175195 if startReceiving {
176196 go func () {
177197 if err := c .transport .Receive (receiverCtx , func (evt cloudevents.Event ) {
@@ -202,8 +222,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
202222 case restartReceiverSignal :
203223 logger .V (2 ).Info ("restart the cloudevents receiver" )
204224 // rebuild the receiver context and restart receiving
205- receiverCtx , receiverCancel = context .WithCancel (context . TODO () )
225+ receiverCtx , receiverCancel = context .WithCancel (ctx )
206226 startReceiving = true
227+ subscribed = false
207228 case stopReceiverSignal :
208229 logger .V (2 ).Info ("stop the cloudevents receiver" )
209230 receiverCancel ()
@@ -224,10 +245,16 @@ func (c *baseClient) sendReceiverSignal(signal int) {
224245 }
225246}
226247
227- func (c * baseClient ) sendReconnectedSignal () {
248+ func (c * baseClient ) sendReconnectedSignal (ctx context. Context ) {
228249 c .RLock ()
229250 defer c .RUnlock ()
230- c .reconnectedChan <- struct {}{}
251+ select {
252+ case c .reconnectedChan <- struct {}{}:
253+ // Signal sent successfully
254+ default :
255+ // No receiver listening on reconnectedChan, that's okay - don't block
256+ klog .FromContext (ctx ).Info ("reconnected signal not sent, no receiver listening" )
257+ }
231258}
232259
233260func (c * baseClient ) isClientReady () bool {
0 commit comments