@@ -79,7 +79,7 @@ func (c *baseClient) connect(ctx context.Context) error {
7979 // TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
8080 // errors
8181 if err != nil {
82- // failed to reconnect, try agin
82+ // failed to reconnect, try again
8383 runtime .HandleErrorWithContext (ctx , err , "the cloudevents client reconnect failed" )
8484 <- wait .RealTimer (DelayFn ()).C ()
8585 continue
@@ -89,14 +89,16 @@ func (c *baseClient) connect(ctx context.Context) error {
8989 metrics .IncreaseClientReconnectedCounter (c .clientID )
9090 c .setClientReady (true )
9191 c .sendReceiverSignal (restartReceiverSignal )
92- c .sendReconnectedSignal ()
9392 }
9493
9594 select {
9695 case <- ctx .Done ():
9796 if c .receiverChan != nil {
9897 close (c .receiverChan )
9998 }
99+ if c .reconnectedChan != nil {
100+ close (c .reconnectedChan )
101+ }
100102 return
101103 case err , ok := <- c .transport .ErrorChan ():
102104 if ! ok {
@@ -164,14 +166,33 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
164166 return
165167 }
166168
169+ // send subscription request before starting to receive events
170+ if err := c .transport .Subscribe (ctx ); err != nil {
171+ runtime .HandleErrorWithContext (ctx , err , "failed to subscribe" )
172+ return
173+ }
174+
167175 c .receiverChan = make (chan int )
168176
169177 // start a go routine to handle cloudevents subscription
170178 go func () {
171- receiverCtx , receiverCancel := context .WithCancel (context . TODO () )
179+ receiverCtx , receiverCancel := context .WithCancel (ctx )
172180 startReceiving := true
181+ subscribed := true
173182
174183 for {
184+ if ! subscribed {
185+ // resubscribe before restarting the receiver
186+ if err := c .transport .Subscribe (ctx ); err != nil {
187+ runtime .HandleError (fmt .Errorf ("failed to resubscribe, %v" , err ))
188+ <- wait .RealTimer (DelayFn ()).C ()
189+ continue
190+ }
191+ subscribed = true
192+ // notify the client caller to resync the resources
193+ c .sendReconnectedSignal (ctx )
194+ }
195+
175196 if startReceiving {
176197 go func () {
177198 if err := c .transport .Receive (receiverCtx , func (evt cloudevents.Event ) {
@@ -202,8 +223,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
202223 case restartReceiverSignal :
203224 logger .V (2 ).Info ("restart the cloudevents receiver" )
204225 // rebuild the receiver context and restart receiving
205- receiverCtx , receiverCancel = context .WithCancel (context . TODO () )
226+ receiverCtx , receiverCancel = context .WithCancel (ctx )
206227 startReceiving = true
228+ subscribed = false
207229 case stopReceiverSignal :
208230 logger .V (2 ).Info ("stop the cloudevents receiver" )
209231 receiverCancel ()
@@ -224,10 +246,16 @@ func (c *baseClient) sendReceiverSignal(signal int) {
224246 }
225247}
226248
227- func (c * baseClient ) sendReconnectedSignal () {
249+ func (c * baseClient ) sendReconnectedSignal (ctx context. Context ) {
228250 c .RLock ()
229251 defer c .RUnlock ()
230- c .reconnectedChan <- struct {}{}
252+ select {
253+ case c .reconnectedChan <- struct {}{}:
254+ // Signal sent successfully
255+ default :
256+ // No receiver listening on reconnectedChan, that's okay - don't block
257+ klog .FromContext (ctx ).Info ("reconnected signal not sent, no receiver listening" )
258+ }
231259}
232260
233261func (c * baseClient ) isClientReady () bool {
0 commit comments