@@ -247,12 +247,6 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
247247 }
248248 topic := config .Topic
249249
250- // Get the processor client
251- processor , err := aeh .getProcessorForTopic (subscribeCtx , topic )
252- if err != nil {
253- return fmt .Errorf ("error trying to establish a connection: %w" , err )
254- }
255-
256250 // This component has built-in retries because Event Hubs doesn't support N/ACK for messages
257251 retryHandler := func (ctx context.Context , events []* azeventhubs.ReceivedEventData ) ([]HandlerResponseItem , error ) {
258252 b := aeh .backOffConfig .NewBackOffWithContext (ctx )
@@ -282,51 +276,58 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
282276
283277 subscriptionLoopFinished := make (chan bool , 1 )
284278
285- // Process all partition clients as they come in
286- subscriberLoop := func () {
287- for {
288- // This will block until a new partition client is available
289- // It returns nil if processor.Run terminates or if the context is canceled
290- partitionClient := processor .NextPartitionClient (subscribeCtx )
291- if partitionClient == nil {
292- subscriptionLoopFinished <- true
293- return
294- }
295- aeh .logger .Debugf ("Received client for partition %s" , partitionClient .PartitionID ())
296-
297- // Once we get a partition client, process the events in a separate goroutine
298- go func () {
299- processErr := aeh .processEvents (subscribeCtx , partitionClient , retryConfig )
300- // Do not log context.Canceled which happens at shutdown
301- if processErr != nil && ! errors .Is (processErr , context .Canceled ) {
302- aeh .logger .Errorf ("Error processing events from partition client: %v" , processErr )
303- }
304- }()
305- }
306- }
307-
308- // Start the processor
279+ // Start the subscribe + processor loop
309280 go func () {
310281 for {
311- go subscriberLoop ()
312- // This is a blocking call that runs until the context is canceled
313- err = processor .Run (subscribeCtx )
314- // Exit if the context is canceled
315- if err != nil && errors .Is (err , context .Canceled ) {
316- return
317- }
282+ // Get the processor client
283+ processor , err := aeh .getProcessorForTopic (subscribeCtx , topic )
318284 if err != nil {
319- aeh .logger .Errorf ("Error from event processor : %v " , err )
285+ aeh .logger .Errorf ("error trying to establish a connection : %w " , err )
320286 } else {
321- aeh .logger .Debugf ("Event processor terminated without error" )
322- }
323- // wait for subscription loop finished signal
324- select {
325- case <- subscribeCtx .Done ():
326- return
327- case <- subscriptionLoopFinished :
328- // noop
287+ // Process all partition clients as they come in
288+ subscriberLoop := func () {
289+ for {
290+ // This will block until a new partition client is available
291+ // It returns nil if processor.Run terminates or if the context is canceled
292+ partitionClient := processor .NextPartitionClient (subscribeCtx )
293+ if partitionClient == nil {
294+ subscriptionLoopFinished <- true
295+ return
296+ }
297+ aeh .logger .Debugf ("Received client for partition %s" , partitionClient .PartitionID ())
298+
299+ // Once we get a partition client, process the events in a separate goroutine
300+ go func () {
301+ processErr := aeh .processEvents (subscribeCtx , partitionClient , retryConfig )
302+ // Do not log context.Canceled which happens at shutdown
303+ if processErr != nil && ! errors .Is (processErr , context .Canceled ) {
304+ aeh .logger .Errorf ("Error processing events from partition client: %v" , processErr )
305+ }
306+ }()
307+ }
308+ }
309+
310+ go subscriberLoop ()
311+ // This is a blocking call that runs until the context is canceled or a non-recoverable error is returned.
312+ err = processor .Run (subscribeCtx )
313+ // Exit if the context is canceled
314+ if err != nil && errors .Is (err , context .Canceled ) {
315+ return
316+ }
317+ if err != nil {
318+ aeh .logger .Errorf ("Error from event processor: %v" , err )
319+ } else {
320+ aeh .logger .Debugf ("Event processor terminated without error" )
321+ }
322+ // wait for subscription loop finished signal
323+ select {
324+ case <- subscribeCtx .Done ():
325+ return
326+ case <- subscriptionLoopFinished :
327+ // noop
328+ }
329329 }
330+
330331 // Waiting here is not strictly necessary, however, we will wait for a short time to increase the likelihood of transient errors having disappeared
331332 select {
332333 case <- subscribeCtx .Done ():
0 commit comments