@@ -2271,30 +2271,43 @@ void ensembleChangeLoop(List<BookieId> origEnsemble, Map<Integer, BookieId> fail
22712271
22722272 List <BookieId > newEnsemble = null ;
22732273 Set <Integer > replaced = null ;
2274+ synchronized (metadataLock ) {
2275+ if (delayedWriteFailedBookies .isEmpty ()) {
2276+ newEnsemble = getCurrentEnsemble ();
2277+ replaced = EnsembleUtils .diffEnsemble (origEnsemble , newEnsemble );
2278+ LOG .info ("New Ensemble: {} for ledger: {}" , newEnsemble , ledgerId );
2279+ }
2280+ }
2281+
2282+ if (newEnsemble != null ) {
2283+ // Resend write requests while changingEnsemble is true (unsetSuccess outside the lock).
2284+ // sendAddSuccessCallbacks() is skipped in this state,
2285+ // preventing pending adds from being marked complete prematurely.
2286+ unsetSuccessAndSendWriteRequest (newEnsemble , replaced );
2287+ }
22742288
22752289 Map <Integer , BookieId > toReplace = null ;
2290+ List <BookieId > nextOrigEnsemble = null ;
2291+ boolean triggerCallbacks = false ;
22762292 synchronized (metadataLock ) {
22772293 if (!delayedWriteFailedBookies .isEmpty ()) {
22782294 toReplace = new HashMap <>(delayedWriteFailedBookies );
22792295 delayedWriteFailedBookies .clear ();
2296+ nextOrigEnsemble = newEnsemble != null ? newEnsemble : origEnsemble ;
22802297 } else {
2281- newEnsemble = getCurrentEnsemble ();
2282- replaced = EnsembleUtils .diffEnsemble (origEnsemble , newEnsemble );
2283- LOG .info ("New Ensemble: {} for ledger: {}" , newEnsemble , ledgerId );
2284-
2285- // Since changingEnsemble is true, processing in #sendAddSuccessCallbacks() is skipped.
2286- unsetSuccessAndSendWriteRequest (newEnsemble , replaced );
22872298 changingEnsemble = false ;
2299+ triggerCallbacks = true ;
22882300 }
22892301 }
22902302
2291- if (toReplace != null && !toReplace .isEmpty ()) {
2292- ensembleChangeLoop (origEnsemble , toReplace );
2303+ if (triggerCallbacks ) {
2304+ // Trigger sendAddSuccessCallbacks() after changingEnsemble is set to false,
2305+ // so that pending adds can complete once the ensemble change is finished.
2306+ sendAddSuccessCallbacks ();
22932307 }
22942308
2295- if (newEnsemble != null ) {
2296- // After changingEnsemble is changed to false, call #sendAddSuccessCallbacks().
2297- sendAddSuccessCallbacks ();
2309+ if (toReplace != null && !toReplace .isEmpty ()) {
2310+ ensembleChangeLoop (nextOrigEnsemble , toReplace );
22982311 }
22992312 }
23002313 }, clientCtx .getMainWorkerPool ().chooseThread (ledgerId ));
0 commit comments