5252import java .util .concurrent .TimeUnit ;
5353import java .util .concurrent .atomic .AtomicBoolean ;
5454import java .util .concurrent .atomic .AtomicInteger ;
55+ import java .util .concurrent .atomic .AtomicLong ;
5556import org .apache .bookkeeper .client .AsyncCallback .AddCallback ;
5657import org .apache .bookkeeper .client .AsyncCallback .AddCallbackWithLatency ;
5758import org .apache .bookkeeper .client .AsyncCallback .CloseCallback ;
@@ -137,7 +138,7 @@ private enum HandleState {
137138 */
138139 private int stickyBookieIndex ;
139140
140- long length ;
141+ final AtomicLong length ;
141142 final DigestManager macManager ;
142143 final DistributionSchedule distributionSchedule ;
143144 final RateLimiter throttler ;
@@ -188,10 +189,10 @@ private enum HandleState {
188189 LedgerMetadata metadata = versionedMetadata .getValue ();
189190 if (metadata .isClosed ()) {
190191 lastAddConfirmed = lastAddPushed = metadata .getLastEntryId ();
191- length = metadata .getLength ();
192+ length = new AtomicLong ( metadata .getLength () );
192193 } else {
193194 lastAddConfirmed = lastAddPushed = INVALID_ENTRY_ID ;
194- length = 0 ;
195+ length = new AtomicLong () ;
195196 }
196197
197198 this .pendingAddsSequenceHead = lastAddConfirmed ;
@@ -365,7 +366,7 @@ boolean setLedgerMetadata(Versioned<LedgerMetadata> expected, Versioned<LedgerMe
365366 LedgerMetadata metadata = versionedMetadata .getValue ();
366367 if (metadata .isClosed ()) {
367368 lastAddConfirmed = lastAddPushed = metadata .getLastEntryId ();
368- length = metadata .getLength ();
369+ length . set ( metadata .getLength () );
369370 }
370371 return true ;
371372 } else {
@@ -422,9 +423,8 @@ DigestManager getDigestManager() {
422423 * @param delta
423424 * @return the length of the ledger after the addition
424425 */
425- synchronized long addToLength (long delta ) {
426- this .length += delta ;
427- return this .length ;
426+ long addToLength (long delta ) {
427+ return length .addAndGet (delta );
428428 }
429429
430430 /**
@@ -433,8 +433,8 @@ synchronized long addToLength(long delta) {
433433 * @return the length of the ledger in bytes
434434 */
435435 @ Override
436- public synchronized long getLength () {
437- return this .length ;
436+ public long getLength () {
437+ return this .length . get () ;
438438 }
439439
440440 /**
@@ -559,7 +559,7 @@ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc
559559
560560 // taking the length must occur after draining, as draining changes the length
561561 lastEntry = lastAddPushed = LedgerHandle .this .lastAddConfirmed ;
562- finalLength = LedgerHandle .this .length ;
562+ finalLength = LedgerHandle .this .length . get () ;
563563 handleState = HandleState .CLOSED ;
564564 }
565565
@@ -1649,7 +1649,7 @@ synchronized void updateLastConfirmed(long lac, long len) {
16491649 lacUpdateMissesCounter .inc ();
16501650 }
16511651 lastAddPushed = Math .max (lastAddPushed , lac );
1652- length = Math .max (length , len );
1652+ length . accumulateAndGet ( len , ( current , value ) -> Math .max (current , value ) );
16531653 }
16541654
16551655 /**
@@ -1985,7 +1985,7 @@ public void asyncReadExplicitLastConfirmed(final ReadLastConfirmedCallback cb, f
19851985 isClosed = metadata .isClosed ();
19861986 if (isClosed ) {
19871987 lastAddConfirmed = metadata .getLastEntryId ();
1988- length = metadata .getLength ();
1988+ length . set ( metadata .getLength () );
19891989 }
19901990 }
19911991 if (isClosed ) {
0 commit comments