@@ -410,6 +410,7 @@ private static class ProducerTracker implements AgentTracker {
410410 private volatile byte publisherId ;
411411 private volatile ClientProducersManager clientProducersManager ;
412412 private final AtomicBoolean recovering = new AtomicBoolean (false );
413+ private final Lock trackerLock = new ReentrantLock ();
413414
414415 private ProducerTracker (
415416 long uniqueId , String reference , String stream , StreamProducer producer ) {
@@ -421,10 +422,12 @@ private ProducerTracker(
421422
422423 @ Override
423424 public void assign (byte producerId , Client client , ClientProducersManager manager ) {
424- synchronized (ProducerTracker .this ) {
425- this .publisherId = producerId ;
426- this .clientProducersManager = manager ;
427- }
425+ lock (
426+ this .trackerLock ,
427+ () -> {
428+ this .publisherId = producerId ;
429+ this .clientProducersManager = manager ;
430+ });
428431 this .producer .setPublisherId (producerId );
429432 this .producer .setClient (client );
430433 }
@@ -451,9 +454,7 @@ public String stream() {
451454
452455 @ Override
453456 public void unavailable () {
454- synchronized (ProducerTracker .this ) {
455- this .clientProducersManager = null ;
456- }
457+ lock (this .trackerLock , () -> this .clientProducersManager = null );
457458 this .producer .unavailable ();
458459 }
459460
@@ -464,10 +465,14 @@ public void running() {
464465 }
465466
466467 @ Override
467- public synchronized void cancel () {
468- if (this .clientProducersManager != null ) {
469- this .clientProducersManager .unregister (this );
470- }
468+ public void cancel () {
469+ lock (
470+ this .trackerLock ,
471+ () -> {
472+ if (this .clientProducersManager != null ) {
473+ this .clientProducersManager .unregister (this );
474+ }
475+ });
471476 }
472477
473478 @ Override
@@ -503,6 +508,7 @@ private static class TrackingConsumerTracker implements AgentTracker {
503508 private final StreamConsumer consumer ;
504509 private volatile ClientProducersManager clientProducersManager ;
505510 private final AtomicBoolean recovering = new AtomicBoolean (false );
511+ private final Lock trackerLock = new ReentrantLock ();
506512
507513 private TrackingConsumerTracker (long uniqueId , String stream , StreamConsumer consumer ) {
508514 this .uniqueId = uniqueId ;
@@ -512,9 +518,7 @@ private TrackingConsumerTracker(long uniqueId, String stream, StreamConsumer con
512518
513519 @ Override
514520 public void assign (byte producerId , Client client , ClientProducersManager manager ) {
515- synchronized (TrackingConsumerTracker .this ) {
516- this .clientProducersManager = manager ;
517- }
521+ lock (this .trackerLock , () -> this .clientProducersManager = manager );
518522 this .consumer .setTrackingClient (client );
519523 }
520524
@@ -540,9 +544,7 @@ public String stream() {
540544
541545 @ Override
542546 public void unavailable () {
543- synchronized (TrackingConsumerTracker .this ) {
544- this .clientProducersManager = null ;
545- }
547+ lock (this .trackerLock , () -> this .clientProducersManager = null );
546548 this .consumer .unavailable ();
547549 }
548550
@@ -553,10 +555,8 @@ public void running() {
553555 }
554556
555557 @ Override
556- public synchronized void cancel () {
557- if (this .clientProducersManager != null ) {
558- this .clientProducersManager .unregister (this );
559- }
558+ public void cancel () {
559+ lock (this .trackerLock , () -> this .clientProducersManager .unregister (this ));
560560 }
561561
562562 @ Override
@@ -599,6 +599,7 @@ private class ClientProducersManager implements Comparable<ClientProducersManage
599599 private final Map <String , Set <AgentTracker >> streamToTrackers = new ConcurrentHashMap <>();
600600 private final Client client ;
601601 private final AtomicBoolean closed = new AtomicBoolean (false );
602+ private final Lock managerLock = new ReentrantLock ();
602603
603604 private ClientProducersManager (
604605 Broker targetNode ,
@@ -670,7 +671,8 @@ private ClientProducersManager(
670671 "Received metadata notification for '{}', stream is likely to have become unavailable" ,
671672 stream );
672673 Set <AgentTracker > affectedTrackers ;
673- synchronized (ClientProducersManager .this ) {
674+ this .managerLock .lock ();
675+ try {
674676 affectedTrackers = streamToTrackers .remove (stream );
675677 LOGGER .debug (
676678 "Affected publishers and consumer trackers after metadata update: {}" ,
@@ -686,6 +688,8 @@ private ClientProducersManager(
686688 }
687689 });
688690 }
691+ } finally {
692+ this .managerLock .unlock ();
689693 }
690694 if (affectedTrackers != null && !affectedTrackers .isEmpty ()) {
691695 environment
@@ -840,79 +844,97 @@ private void recoverAgent(Broker node, List<BrokerWrapper> candidates, AgentTrac
840844 }
841845 }
842846
843- private synchronized void register (AgentTracker tracker ) {
844- if (this .isFullFor (tracker )) {
845- throw new IllegalStateException ("Cannot add subscription tracker, the manager is full" );
846- }
847- if (this .isClosed ()) {
848- throw new IllegalStateException ("Cannot add subscription tracker, the manager is closed" );
849- }
850- checkNotClosed ();
851- if (tracker .identifiable ()) {
852- ProducerTracker producerTracker = (ProducerTracker ) tracker ;
853- int index = pickSlot (this .producers , producerTracker , this .producerIndexSequence );
854- this .checkNotClosed ();
855- Response response =
856- callAndMaybeRetry (
857- () ->
858- this .client .declarePublisher (
859- (byte ) index , tracker .reference (), tracker .stream ()),
860- RETRY_ON_TIMEOUT ,
861- "Declare publisher request for publisher %d on stream '%s'" ,
862- producerTracker .uniqueId (),
863- producerTracker .stream ());
864- if (response .isOk ()) {
865- tracker .assign ((byte ) index , this .client , this );
866- } else {
867- String message =
868- "Error while declaring publisher: "
869- + formatConstant (response .getResponseCode ())
870- + ". Could not assign producer to client." ;
871- LOGGER .info (message );
872- throw new StreamException (message , response .getResponseCode ());
873- }
874- producers .put (tracker .id (), producerTracker );
875- } else {
876- tracker .assign ((byte ) 0 , this .client , this );
877- trackingConsumerTrackers .add (tracker );
878- }
879- streamToTrackers
880- .computeIfAbsent (tracker .stream (), s -> ConcurrentHashMap .newKeySet ())
881- .add (tracker );
847+ private void register (AgentTracker tracker ) {
848+ lock (
849+ this .managerLock ,
850+ () -> {
851+ if (this .isFullFor (tracker )) {
852+ throw new IllegalStateException (
853+ "Cannot add subscription tracker, the manager is full" );
854+ }
855+ if (this .isClosed ()) {
856+ throw new IllegalStateException (
857+ "Cannot add subscription tracker, the manager is closed" );
858+ }
859+ checkNotClosed ();
860+ if (tracker .identifiable ()) {
861+ ProducerTracker producerTracker = (ProducerTracker ) tracker ;
862+ int index = pickSlot (this .producers , producerTracker , this .producerIndexSequence );
863+ this .checkNotClosed ();
864+ Response response =
865+ callAndMaybeRetry (
866+ () ->
867+ this .client .declarePublisher (
868+ (byte ) index , tracker .reference (), tracker .stream ()),
869+ RETRY_ON_TIMEOUT ,
870+ "Declare publisher request for publisher %d on stream '%s'" ,
871+ producerTracker .uniqueId (),
872+ producerTracker .stream ());
873+ if (response .isOk ()) {
874+ tracker .assign ((byte ) index , this .client , this );
875+ } else {
876+ String message =
877+ "Error while declaring publisher: "
878+ + formatConstant (response .getResponseCode ())
879+ + ". Could not assign producer to client." ;
880+ LOGGER .info (message );
881+ throw new StreamException (message , response .getResponseCode ());
882+ }
883+ producers .put (tracker .id (), producerTracker );
884+ } else {
885+ tracker .assign ((byte ) 0 , this .client , this );
886+ trackingConsumerTrackers .add (tracker );
887+ }
888+ streamToTrackers
889+ .computeIfAbsent (tracker .stream (), s -> ConcurrentHashMap .newKeySet ())
890+ .add (tracker );
891+ });
882892 }
883893
884- private synchronized void unregister (AgentTracker tracker ) {
885- LOGGER .debug (
886- "Unregistering {} {} from manager on {}" , tracker .type (), tracker .uniqueId (), this .name );
887- if (tracker .identifiable ()) {
888- producers .remove (tracker .id ());
889- } else {
890- trackingConsumerTrackers .remove (tracker );
891- }
892- streamToTrackers .compute (
893- tracker .stream (),
894- (s , trackersForThisStream ) -> {
895- if (s == null || trackersForThisStream == null ) {
896- // should not happen
897- return null ;
894+ private void unregister (AgentTracker tracker ) {
895+ lock (
896+ this .managerLock ,
897+ () -> {
898+ LOGGER .debug (
899+ "Unregistering {} {} from manager on {}" ,
900+ tracker .type (),
901+ tracker .uniqueId (),
902+ this .name );
903+ if (tracker .identifiable ()) {
904+ producers .remove (tracker .id ());
898905 } else {
899- trackersForThisStream .remove (tracker );
900- return trackersForThisStream .isEmpty () ? null : trackersForThisStream ;
906+ trackingConsumerTrackers .remove (tracker );
901907 }
908+ streamToTrackers .compute (
909+ tracker .stream (),
910+ (s , trackersForThisStream ) -> {
911+ if (s == null || trackersForThisStream == null ) {
912+ // should not happen
913+ return null ;
914+ } else {
915+ trackersForThisStream .remove (tracker );
916+ return trackersForThisStream .isEmpty () ? null : trackersForThisStream ;
917+ }
918+ });
919+ closeIfEmpty ();
902920 });
903- closeIfEmpty ();
904921 }
905922
906- synchronized boolean isFullFor (AgentTracker tracker ) {
907- if (tracker .identifiable ()) {
908- return producers .size () == maxProducersByClient ;
909- } else {
910- return trackingConsumerTrackers .size () == maxTrackingConsumersByClient ;
911- }
923+ boolean isFullFor (AgentTracker tracker ) {
924+ return lock (
925+ this .managerLock ,
926+ () -> {
927+ if (tracker .identifiable ()) {
928+ return producers .size () == maxProducersByClient ;
929+ } else {
930+ return trackingConsumerTrackers .size () == maxTrackingConsumersByClient ;
931+ }
932+ });
912933 }
913934
914- synchronized boolean isEmpty () {
915- return producers .isEmpty () && trackingConsumerTrackers .isEmpty ();
935+ boolean isEmpty () {
936+ return lock (
937+ this .managerLock , () -> producers .isEmpty () && trackingConsumerTrackers .isEmpty ());
916938 }
917939
918940 private void checkNotClosed () {
@@ -930,13 +952,15 @@ boolean isClosed() {
930952
931953 private void closeIfEmpty () {
932954 if (!closed .get ()) {
933- synchronized (this ) {
934- if (this .isEmpty ()) {
935- this .close ();
936- } else {
937- LOGGER .debug ("Not closing producer manager {} because it is not empty" , this .id );
938- }
939- }
955+ lock (
956+ this .managerLock ,
957+ () -> {
958+ if (this .isEmpty ()) {
959+ this .close ();
960+ } else {
961+ LOGGER .debug ("Not closing producer manager {} because it is not empty" , this .id );
962+ }
963+ });
940964 }
941965 }
942966
0 commit comments