2020import com .beust .jcommander .Parameter ;
2121import com .beust .jcommander .converters .CommaParameterSplitter ;
2222import com .google .common .annotations .VisibleForTesting ;
23- import com .google .common .base .Functions ;
2423import com .google .common .util .concurrent .UncheckedExecutionException ;
2524import java .io .IOException ;
2625import java .net .URI ;
27- import java .util .Collections ;
28- import java .util .HashMap ;
2926import java .util .List ;
30- import java .util .Map ;
3127import java .util .NavigableSet ;
3228import java .util .TreeSet ;
3329import java .util .concurrent .ConcurrentSkipListSet ;
3430import java .util .concurrent .CountDownLatch ;
3531import java .util .stream .Collectors ;
36- import java .util .stream .IntStream ;
3732import lombok .Cleanup ;
3833import lombok .Setter ;
3934import lombok .experimental .Accessors ;
4035import org .apache .bookkeeper .bookie .BookieException ;
4136import org .apache .bookkeeper .client .BKException ;
4237import org .apache .bookkeeper .client .BookKeeper ;
4338import org .apache .bookkeeper .client .BookKeeperAdmin ;
44- import org .apache .bookkeeper .client .EnsemblePlacementPolicy ;
45- import org .apache .bookkeeper .client .LedgerFragment ;
46- import org .apache .bookkeeper .client .api .LedgerMetadata ;
4739import org .apache .bookkeeper .conf .ClientConfiguration ;
4840import org .apache .bookkeeper .conf .ServerConfiguration ;
4941import org .apache .bookkeeper .meta .LedgerManagerFactory ;
5042import org .apache .bookkeeper .meta .LedgerUnderreplicationManager ;
5143import org .apache .bookkeeper .meta .MetadataBookieDriver ;
5244import org .apache .bookkeeper .meta .MetadataDrivers ;
5345import org .apache .bookkeeper .meta .exceptions .MetadataException ;
54- import org .apache .bookkeeper .net .BookieId ;
5546import org .apache .bookkeeper .replication .ReplicationException ;
5647import org .apache .bookkeeper .stats .NullStatsLogger ;
5748import org .apache .bookkeeper .tools .cli .helpers .BookieCommand ;
@@ -133,7 +124,7 @@ public boolean apply(ServerConfiguration conf, CorrectEnsemblePlacementFlags fla
133124
134125 final ClientConfiguration clientConf = new ClientConfiguration (conf );
135126 final BookKeeper bookKeeper = new BookKeeper (clientConf );
136- final BookKeeperAdmin admin = new BookKeeperAdmin (clientConf );
127+ final BookKeeperAdmin admin = new BookKeeperAdmin (bookKeeper );
137128 return relocate (conf , flags , bookKeeper , admin );
138129 } catch (Exception e ) {
139130 throw new UncheckedExecutionException (e .getMessage (), e );
@@ -143,8 +134,6 @@ public boolean apply(ServerConfiguration conf, CorrectEnsemblePlacementFlags fla
143134 @ VisibleForTesting
144135 public boolean relocate (ServerConfiguration conf , CorrectEnsemblePlacementFlags flags ,
145136 BookKeeper bookKeeper , BookKeeperAdmin admin ) throws Exception {
146- final EnsemblePlacementPolicy placementPolicy = bookKeeper .getPlacementPolicy ();
147-
148137 @ Cleanup
149138 final MetadataBookieDriver metadataDriver = instantiateMetadataDriver (conf );
150139 @ Cleanup
@@ -153,7 +142,7 @@ public boolean relocate(ServerConfiguration conf, CorrectEnsemblePlacementFlags
153142 final LedgerUnderreplicationManager lum = lmf .newLedgerUnderreplicationManager ();
154143
155144 final List <Long > targetLedgers =
156- flags .ledgerIds .stream ().parallel (). distinct ().filter (ledgerId -> {
145+ flags .ledgerIds .stream ().distinct ().filter (ledgerId -> {
157146 try {
158147 return (!flags .skipOpenLedgers || bookKeeper .isClosed (ledgerId ))
159148 && !lum .isLedgerBeingReplicated (ledgerId );
@@ -172,6 +161,9 @@ public boolean relocate(ServerConfiguration conf, CorrectEnsemblePlacementFlags
172161 final NavigableSet <Pair <Long , Long >> failedTargets = new ConcurrentSkipListSet <>();
173162 final CountDownLatch latch = new CountDownLatch (targetLedgers .size ());
174163 for (long ledgerId : targetLedgers ) {
164+ if (LOG .isDebugEnabled ()) {
165+ LOG .debug ("Start relocation of the ledger {}." , ledgerId );
166+ }
175167 if (!flags .dryRun ) {
176168 try {
177169 lum .acquireUnderreplicatedLedger (ledgerId );
@@ -188,103 +180,18 @@ public boolean relocate(ServerConfiguration conf, CorrectEnsemblePlacementFlags
188180 LOG .warn ("Failed to open ledger {}" , ledgerId );
189181 return ;
190182 }
191-
192- final LedgerMetadata ledgerMeta = lh .getLedgerMetadata ();
193- final Map <Long , Long > ledgerFragmentsRange = new HashMap <>();
194- Long curEntryId = null ;
195- for (Map .Entry <Long , ? extends List <BookieId >> entry :
196- ledgerMeta .getAllEnsembles ().entrySet ()) {
197- if (curEntryId != null ) {
198- ledgerFragmentsRange .put (curEntryId , entry .getKey () - 1 );
199- }
200- curEntryId = entry .getKey ();
201- }
202- if (curEntryId != null ) {
203- ledgerFragmentsRange .put (curEntryId , lh .getLastAddConfirmed ());
204- }
205-
206- for (Map .Entry <Long , ? extends List <BookieId >> entry : ledgerMeta .getAllEnsembles ().entrySet ()) {
207- if (placementPolicy .isEnsembleAdheringToPlacementPolicy (entry .getValue (),
208- ledgerMeta .getWriteQuorumSize (), ledgerMeta .getAckQuorumSize ())
209- == EnsemblePlacementPolicy .PlacementPolicyAdherence .FAIL ) {
210- try {
211- final List <BookieId > currentEnsemble = entry .getValue ();
212- // Currently, don't consider quarantinedBookies
213- final EnsemblePlacementPolicy .PlacementResult <List <BookieId >> placementResult =
214- placementPolicy .replaceToAdherePlacementPolicy (
215- ledgerMeta .getEnsembleSize (),
216- ledgerMeta .getWriteQuorumSize (),
217- ledgerMeta .getAckQuorumSize (),
218- Collections .emptySet (),
219- currentEnsemble );
220-
221- if (placementResult .isAdheringToPolicy ()
222- == EnsemblePlacementPolicy .PlacementPolicyAdherence .FAIL ) {
223- LOG .warn ("Failed to relocate the ensemble. So, skip the operation."
224- + " ledgerId: {}, fragmentIndex: {}" ,
225- ledgerId , entry .getKey ());
226- failedTargets .add (Pair .of (ledgerId , entry .getKey ()));
227- } else {
228- final List <BookieId > newEnsemble = placementResult .getResult ();
229- final Map <Integer , BookieId > replaceBookiesMap = IntStream
230- .range (0 , ledgerMeta .getEnsembleSize ()).boxed ()
231- .filter (i -> !newEnsemble .get (i ).equals (currentEnsemble .get (i )))
232- .collect (Collectors .toMap (Functions .identity (), newEnsemble ::get ));
233- if (replaceBookiesMap .isEmpty ()) {
234- LOG .warn ("Failed to get bookies to replace. So, skip the operation."
235- + " ledgerId: {}, fragmentIndex: {}" ,
236- ledgerId , entry .getKey ());
237- failedTargets .add (Pair .of (ledgerId , entry .getKey ()));
238- } else if (flags .dryRun ) {
239- LOG .info ("Would replace the ensemble. ledgerId: {}, fragmentIndex: {},"
240- + " currentEnsemble: {} replaceBookiesMap {}" ,
241- ledgerId , entry .getKey (),
242- currentEnsemble , replaceBookiesMap );
243- } else {
244- if (LOG .isDebugEnabled ()) {
245- LOG .debug ("Try to replace the ensemble. ledgerId: {}, fragmentIndex: {},"
246- + " replaceBookiesMap {}" ,
247- ledgerId , entry .getKey (), replaceBookiesMap );
248- }
249- final LedgerFragment fragment = new LedgerFragment (lh , entry .getKey (),
250- ledgerFragmentsRange .get (entry .getKey ()), replaceBookiesMap .keySet ());
251-
252- try {
253- admin .replicateLedgerFragment (lh , fragment , replaceBookiesMap ,
254- (lId , eId ) -> {
255- // This consumer is already accepted before the method returns
256- // void. Therefore, use failedTargets in this consumer.
257- LOG .warn ("Failed to read entry {}:{}" , lId , eId );
258- failedTargets .add (Pair .of (ledgerId , entry .getKey ()));
259- });
260- LOG .info ("Operation finished in the ensemble. ledgerId: {},"
261- + " fragmentIndex: {}, replaceBookiesMap {}" ,
262- ledgerId , entry .getKey (), replaceBookiesMap );
263- } catch (BKException | InterruptedException e ) {
264- LOG .warn ("Failed to replicate ledger fragment." , e );
265- failedTargets .add (Pair .of (ledgerId , entry .getKey ()));
266- }
267- }
268- }
269- } catch (UnsupportedOperationException e ) {
270- LOG .warn ("UnsupportedOperationException caught. The placement policy might not support"
271- + " replaceToAdherePlacementPolicy method." , e );
272- failedTargets .add (Pair .of (ledgerId , entry .getKey ()));
273- }
274- } else {
275- if (LOG .isDebugEnabled ()) {
276- LOG .debug ("The fragment is adhering to placement policy. So, skip the operation."
277- + " ledgerId: {}, fragmentIndex: {}" , ledgerId , entry .getKey ());
278- }
279- }
280- }
183+ admin .relocateLedgerToAdherePlacementPolicy (lh , flags .dryRun )
184+ .forEach (e -> failedTargets .add (Pair .of (ledgerId , e )));
185+ } catch (UnsupportedOperationException e ) {
186+ LOG .warn ("UnsupportedOperationException caught. The placement policy might not support"
187+ + " replaceToAdherePlacementPolicy method." , e );
281188 } finally {
282189 try {
283190 if (!flags .dryRun ) {
284191 lum .releaseUnderreplicatedLedger (ledgerId );
285192 }
286193 } catch (ReplicationException e ) {
287- LOG .warn ("Failed to release under replicated ledger {}." , ledgerId , e );
194+ LOG .error ("Failed to release under replicated ledger {}." , ledgerId , e );
288195 } finally {
289196 ((CountDownLatch ) ctx ).countDown ();
290197 }
0 commit comments