30
30
31
31
import static com .azure .core .amqp .implementation .ClientConstants .ENTITY_PATH_KEY ;
32
32
import static com .azure .core .amqp .implementation .ClientConstants .INTERVAL_KEY ;
33
+ import static com .azure .core .amqp .implementation .ClientConstants .SUBSCRIBER_ID_KEY ;
33
34
34
35
public class AmqpChannelProcessor <T > extends Mono <T > implements Processor <T , T >, CoreSubscriber <T >, Disposable {
35
36
@ SuppressWarnings ("rawtypes" )
@@ -66,7 +67,6 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
66
67
this .endpointStatesFunction = Objects .requireNonNull (endpointStatesFunction ,
67
68
"'endpointStates' cannot be null." );
68
69
this .retryPolicy = Objects .requireNonNull (retryPolicy , "'retryPolicy' cannot be null." );
69
-
70
70
Map <String , Object > loggingContext = new HashMap <>(1 );
71
71
loggingContext .put (ENTITY_PATH_KEY , Objects .requireNonNull (entityPath , "'entityPath' cannot be null." ));
72
72
this .logger = new ClientLogger (getClass (), loggingContext );
@@ -77,6 +77,7 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace, Function<T, Flux<Amq
77
77
this .endpointStatesFunction = Objects .requireNonNull (endpointStatesFunction ,
78
78
"'endpointStates' cannot be null." );
79
79
this .retryPolicy = Objects .requireNonNull (retryPolicy , "'retryPolicy' cannot be null." );
80
+
80
81
this .logger = new ClientLogger (getClass (), Objects .requireNonNull (loggingContext , "'loggingContext' cannot be null." ));
81
82
this .errorContext = new AmqpErrorContext (fullyQualifiedNamespace );
82
83
}
@@ -107,8 +108,6 @@ public void onNext(T amqpChannel) {
107
108
currentChannel = amqpChannel ;
108
109
109
110
final ConcurrentLinkedDeque <ChannelSubscriber <T >> currentSubscribers = subscribers ;
110
- logger .info ("Next AMQP channel received, updating {} current subscribers" , subscribers .size ());
111
-
112
111
currentSubscribers .forEach (subscription -> subscription .onNext (amqpChannel ));
113
112
114
113
connectionSubscription = endpointStatesFunction .apply (amqpChannel ).subscribe (
@@ -227,8 +226,6 @@ public void onError(Throwable throwable) {
227
226
synchronized (lock ) {
228
227
final ConcurrentLinkedDeque <ChannelSubscriber <T >> currentSubscribers = subscribers ;
229
228
subscribers = new ConcurrentLinkedDeque <>();
230
- logger .info ("Error in AMQP channel processor. Notifying {} subscribers." , currentSubscribers .size ());
231
-
232
229
currentSubscribers .forEach (subscriber -> subscriber .onError (throwable ));
233
230
}
234
231
}
@@ -242,7 +239,6 @@ public void onComplete() {
242
239
synchronized (lock ) {
243
240
final ConcurrentLinkedDeque <ChannelSubscriber <T >> currentSubscribers = subscribers ;
244
241
subscribers = new ConcurrentLinkedDeque <>();
245
- logger .info ("AMQP channel processor completed. Notifying {} subscribers." , currentSubscribers .size ());
246
242
currentSubscribers .forEach (subscriber -> subscriber .onComplete ());
247
243
}
248
244
}
@@ -270,8 +266,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
270
266
}
271
267
}
272
268
269
+ subscriber .onAdd ();
273
270
subscribers .add (subscriber );
274
- logger .atVerbose ().addKeyValue ("subscribers" , subscribers .size ()).log ("Added a subscriber." );
275
271
276
272
if (!isRetryPending .get ()) {
277
273
requestUpstream ();
@@ -370,17 +366,37 @@ private void close(T channel) {
370
366
* which removes itself from the tracking list, then propagates the notification to the wrapped subscriber.
371
367
*/
372
368
private static final class ChannelSubscriber <T > extends Operators .MonoSubscriber <T , T > {
369
+
373
370
private final AmqpChannelProcessor <T > processor ;
371
+ // subscriberId is only needed for logging and not every subscriber is logged, so let's make it lazy.
372
+ private String subscriberId = null ;
374
373
375
374
private ChannelSubscriber (CoreSubscriber <? super T > actual , AmqpChannelProcessor <T > processor ) {
376
375
super (actual );
377
376
this .processor = processor ;
378
377
}
379
378
379
+ void onAdd () {
380
+ Object subscriberIdObj = actual .currentContext ().getOrDefault (SUBSCRIBER_ID_KEY , null );
381
+ if (subscriberIdObj != null ) {
382
+ subscriberId = subscriberIdObj .toString ();
383
+ } else {
384
+ subscriberId = StringUtil .getRandomString ("un" );
385
+ }
386
+
387
+ // most subscribers never get here and will be completed immediately after they are created.
388
+ processor .logger .atVerbose ()
389
+ .addKeyValue (SUBSCRIBER_ID_KEY , subscriberId )
390
+ .log ("Added subscriber." );
391
+ }
392
+
380
393
@ Override
381
394
public void cancel () {
382
395
processor .subscribers .remove (this );
383
396
super .cancel ();
397
+ processor .logger .atVerbose ()
398
+ .addKeyValue (SUBSCRIBER_ID_KEY , subscriberId )
399
+ .log ("Canceled subscriber" );
384
400
}
385
401
386
402
@ Override
@@ -389,6 +405,9 @@ public void onComplete() {
389
405
// first untrack before calling into external code.
390
406
processor .subscribers .remove (this );
391
407
actual .onComplete ();
408
+ processor .logger .atInfo ()
409
+ .addKeyValue (SUBSCRIBER_ID_KEY , subscriberId )
410
+ .log ("AMQP channel processor completed." );
392
411
}
393
412
}
394
413
@@ -397,6 +416,10 @@ public void onNext(T channel) {
397
416
if (!isCancelled ()) {
398
417
processor .subscribers .remove (this );
399
418
super .complete (channel );
419
+
420
+ processor .logger .atInfo ()
421
+ .addKeyValue (SUBSCRIBER_ID_KEY , subscriberId )
422
+ .log ("Next AMQP channel received." );
400
423
}
401
424
}
402
425
@@ -405,6 +428,9 @@ public void onError(Throwable throwable) {
405
428
if (!isCancelled ()) {
406
429
processor .subscribers .remove (this );
407
430
actual .onError (throwable );
431
+ processor .logger .atInfo ()
432
+ .addKeyValue (SUBSCRIBER_ID_KEY , subscriberId )
433
+ .log ("Error in AMQP channel processor." );
408
434
} else {
409
435
Operators .onErrorDropped (throwable , currentContext ());
410
436
}
0 commit comments