@@ -163,7 +163,7 @@ private enum HandleState {
163163 public static final long INVALID_LEDGER_ID = -0xABCDABCDL ;
164164
165165 final Object metadataLock = new Object ();
166- boolean changingEnsemble = false ;
166+ volatile boolean changingEnsemble = false ;
167167 final AtomicInteger numEnsembleChanges = new AtomicInteger (0 );
168168 Queue <PendingAddOp > pendingAddOps ;
169169 ExplicitLacFlushPolicy explicitLacFlushPolicy ;
@@ -172,6 +172,7 @@ private enum HandleState {
172172 final Counter lacUpdateHitsCounter ;
173173 final Counter lacUpdateMissesCounter ;
174174 private final OpStatsLogger clientChannelWriteWaitStats ;
175+ private final AtomicBoolean sendAddSuccessCallbacksInProgress = new AtomicBoolean (false );
175176
176177 LedgerHandle (ClientContext clientCtx ,
177178 long ledgerId , Versioned <LedgerMetadata > versionedMetadata ,
@@ -557,7 +558,7 @@ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc
557558 pendingAdds = drainPendingAddsAndAdjustLength ();
558559
559560 // taking the length must occur after draining, as draining changes the length
560- lastEntry = lastAddPushed = LedgerHandle .this .lastAddConfirmed ;
561+ lastEntry = lastAddPushed = pendingAddsSequenceHead = LedgerHandle .this .lastAddConfirmed ;
561562 finalLength = LedgerHandle .this .length ;
562563 handleState = HandleState .CLOSED ;
563564 }
@@ -1791,13 +1792,17 @@ void errorOutPendingAdds(int rc) {
17911792 }
17921793
17931794 synchronized List <PendingAddOp > drainPendingAddsAndAdjustLength () {
1794- PendingAddOp pendingAddOp ;
1795- List <PendingAddOp > opsDrained = new ArrayList <PendingAddOp >(pendingAddOps .size ());
1796- while ((pendingAddOp = pendingAddOps .poll ()) != null ) {
1797- addToLength (-pendingAddOp .entryLength );
1798- opsDrained .add (pendingAddOp );
1795+ // synchronize on pendingAddOps to ensure that sendAddSuccessCallbacks isn't concurrently
1796+ // modifying pendingAddOps
1797+ synchronized (pendingAddOps ) {
1798+ PendingAddOp pendingAddOp ;
1799+ List <PendingAddOp > opsDrained = new ArrayList <PendingAddOp >(pendingAddOps .size ());
1800+ while ((pendingAddOp = pendingAddOps .poll ()) != null ) {
1801+ addToLength (-pendingAddOp .entryLength );
1802+ opsDrained .add (pendingAddOp );
1803+ }
1804+ return opsDrained ;
17991805 }
1800- return opsDrained ;
18011806 }
18021807
18031808 void errorOutPendingAdds (int rc , List <PendingAddOp > ops ) {
@@ -1806,38 +1811,43 @@ void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
18061811 }
18071812 }
18081813
1814+
18091815 void sendAddSuccessCallbacks () {
1810- // Start from the head of the queue and proceed while there are
1811- // entries that have had all their responses come back
1812- PendingAddOp pendingAddOp ;
1813-
1814- while ((pendingAddOp = pendingAddOps .peek ()) != null
1815- && !changingEnsemble ) {
1816- if (!pendingAddOp .completed ) {
1817- if (LOG .isDebugEnabled ()) {
1818- LOG .debug ("pending add not completed: {}" , pendingAddOp );
1819- }
1820- return ;
1821- }
1822- // Check if it is the next entry in the sequence.
1823- if (pendingAddOp .entryId != 0 && pendingAddOp .entryId != pendingAddsSequenceHead + 1 ) {
1824- if (LOG .isDebugEnabled ()) {
1825- LOG .debug ("Head of the queue entryId: {} is not the expected value: {}" , pendingAddOp .entryId ,
1826- pendingAddsSequenceHead + 1 );
1827- }
1828- return ;
1829- }
1816+ if (!sendAddSuccessCallbacksInProgress .compareAndSet (false , true )) {
1817+ // another thread is already sending the callbacks
1818+ return ;
1819+ }
1820+ try {
1821+ // Start from the head of the queue and proceed while there are
1822+ // entries that have had all their responses come back
18301823
1831- pendingAddOps .remove ();
1832- explicitLacFlushPolicy .updatePiggyBackedLac (lastAddConfirmed );
1833- pendingAddsSequenceHead = pendingAddOp .entryId ;
1834- if (!writeFlags .contains (WriteFlag .DEFERRED_SYNC )) {
1835- this .lastAddConfirmed = pendingAddsSequenceHead ;
1836- }
1824+ // synchronize on pendingAddOps to ensure that drainPendingAddsAndAdjustLength isn't concurrently
1825+ // modifying pendingAddOps
1826+ synchronized (pendingAddOps ) {
1827+ PendingAddOp pendingAddOp ;
18371828
1838- pendingAddOp .submitCallback (BKException .Code .OK );
1839- }
1829+ while ((pendingAddOp = pendingAddOps .peek ()) != null
1830+ && !changingEnsemble ) {
1831+ if (!pendingAddOp .completed ) {
1832+ if (LOG .isDebugEnabled ()) {
1833+ LOG .debug ("pending add not completed: {}" , pendingAddOp );
1834+ }
1835+ return ;
1836+ }
18401837
1838+ pendingAddOps .remove ();
1839+ explicitLacFlushPolicy .updatePiggyBackedLac (lastAddConfirmed );
1840+ pendingAddsSequenceHead = pendingAddOp .entryId ;
1841+ if (!writeFlags .contains (WriteFlag .DEFERRED_SYNC )) {
1842+ this .lastAddConfirmed = pendingAddsSequenceHead ;
1843+ }
1844+
1845+ pendingAddOp .submitCallback (BKException .Code .OK );
1846+ }
1847+ }
1848+ } finally {
1849+ sendAddSuccessCallbacksInProgress .set (false );
1850+ }
18411851 }
18421852
18431853 @ VisibleForTesting
0 commit comments