@@ -79,7 +79,7 @@ func (c *baseClient) connect(ctx context.Context) error {
7979 // TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
8080 // errors
8181 if err != nil {
82- // failed to reconnect, try agin
82+ // failed to reconnect, try again
8383 runtime .HandleErrorWithContext (ctx , err , "the cloudevents client reconnect failed" )
8484 <- wait .RealTimer (DelayFn ()).C ()
8585 continue
@@ -89,14 +89,11 @@ func (c *baseClient) connect(ctx context.Context) error {
8989 metrics .IncreaseClientReconnectedCounter (c .clientID )
9090 c .setClientReady (true )
9191 c .sendReceiverSignal (restartReceiverSignal )
92- c .sendReconnectedSignal ()
9392 }
9493
9594 select {
9695 case <- ctx .Done ():
97- if c .receiverChan != nil {
98- close (c .receiverChan )
99- }
96+ c .closeChannels ()
10097 return
10198 case err , ok := <- c .transport .ErrorChan ():
10299 if ! ok {
@@ -164,14 +161,33 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
164161 return
165162 }
166163
164+ // send subscription request before starting to receive events
165+ if err := c .transport .Subscribe (ctx ); err != nil {
166+ runtime .HandleErrorWithContext (ctx , err , "failed to subscribe" )
167+ return
168+ }
169+
167170 c .receiverChan = make (chan int )
168171
169172 // start a go routine to handle cloudevents subscription
170173 go func () {
171- receiverCtx , receiverCancel := context .WithCancel (context . TODO () )
174+ receiverCtx , receiverCancel := context .WithCancel (ctx )
172175 startReceiving := true
176+ subscribed := true
173177
174178 for {
179+ if ! subscribed {
180+ // resubscribe before restarting the receiver
181+ if err := c .transport .Subscribe (ctx ); err != nil {
182+ runtime .HandleError (fmt .Errorf ("failed to resubscribe, %v" , err ))
183+ <- wait .RealTimer (DelayFn ()).C ()
184+ continue
185+ }
186+ subscribed = true
187+ // notify the client caller to resync the resources
188+ c .sendReconnectedSignal (ctx )
189+ }
190+
175191 if startReceiving {
176192 go func () {
177193 if err := c .transport .Receive (receiverCtx , func (evt cloudevents.Event ) {
@@ -191,7 +207,7 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
191207 case <- ctx .Done ():
192208 receiverCancel ()
193209 return
194- case signal , ok := <- c .receiverChan :
210+ case signal , ok := <- c .getReceiverChan () :
195211 if ! ok {
196212 // receiver channel is closed, stop the receiver
197213 receiverCancel ()
@@ -202,8 +218,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
202218 case restartReceiverSignal :
203219 logger .V (2 ).Info ("restart the cloudevents receiver" )
204220 // rebuild the receiver context and restart receiving
205- receiverCtx , receiverCancel = context .WithCancel (context . TODO () )
221+ receiverCtx , receiverCancel = context .WithCancel (ctx )
206222 startReceiving = true
223+ subscribed = false
207224 case stopReceiverSignal :
208225 logger .V (2 ).Info ("stop the cloudevents receiver" )
209226 receiverCancel ()
@@ -224,10 +241,32 @@ func (c *baseClient) sendReceiverSignal(signal int) {
224241 }
225242}
226243
227- func (c * baseClient ) sendReconnectedSignal () {
244+ func (c * baseClient ) closeChannels () {
245+ c .Lock ()
246+ defer c .Unlock ()
247+
248+ if c .receiverChan != nil {
249+ close (c .receiverChan )
250+ c .receiverChan = nil
251+ }
252+ if c .reconnectedChan != nil {
253+ close (c .reconnectedChan )
254+ c .reconnectedChan = nil
255+ }
256+ }
257+
258+ func (c * baseClient ) sendReconnectedSignal (ctx context.Context ) {
228259 c .RLock ()
229260 defer c .RUnlock ()
230- c .reconnectedChan <- struct {}{}
261+ if c .reconnectedChan != nil {
262+ select {
263+ case c .reconnectedChan <- struct {}{}:
264+ // Signal sent successfully
265+ default :
266+ // No receiver listening on reconnectedChan, that's okay - don't block
267+ klog .FromContext (ctx ).Info ("reconnected signal not sent, no receiver listening" )
268+ }
269+ }
231270}
232271
233272func (c * baseClient ) isClientReady () bool {
@@ -241,3 +280,9 @@ func (c *baseClient) setClientReady(ready bool) {
241280 defer c .Unlock ()
242281 c .clientReady = ready
243282}
283+
284+ func (c * baseClient ) getReceiverChan () chan int {
285+ c .RLock ()
286+ defer c .RUnlock ()
287+ return c .receiverChan
288+ }
0 commit comments