|
22 | 22 | import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT; |
23 | 23 | import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER; |
24 | 24 | import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE; |
| 25 | +import static org.apache.bookkeeper.client.BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK; |
25 | 26 | import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED; |
26 | 27 | import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION; |
27 | 28 |
|
|
67 | 68 | import org.apache.bookkeeper.net.ScriptBasedMapping; |
68 | 69 | import org.apache.bookkeeper.net.StabilizeNetworkTopology; |
69 | 70 | import org.apache.bookkeeper.stats.Counter; |
| 71 | +import org.apache.bookkeeper.stats.Gauge; |
70 | 72 | import org.apache.bookkeeper.stats.OpStatsLogger; |
71 | 73 | import org.apache.bookkeeper.stats.StatsLogger; |
72 | 74 | import org.apache.bookkeeper.stats.annotations.StatsDoc; |
@@ -114,11 +116,6 @@ static class DefaultResolver implements DNSToSwitchMapping { |
114 | 116 |
|
115 | 117 | final Supplier<String> defaultRackSupplier; |
116 | 118 |
|
117 | | - // for backwards compat |
118 | | - public DefaultResolver() { |
119 | | - this(() -> NetworkTopology.DEFAULT_REGION_AND_RACK); |
120 | | - } |
121 | | - |
122 | 119 | public DefaultResolver(Supplier<String> defaultRackSupplier) { |
123 | 120 | checkNotNull(defaultRackSupplier, "defaultRackSupplier should not be null"); |
124 | 121 | this.defaultRackSupplier = defaultRackSupplier; |
@@ -240,7 +237,16 @@ public void reloadCachedMappings() { |
240 | 237 | help = "The distribution of number of bookies reordered on each read request" |
241 | 238 | ) |
242 | 239 | protected OpStatsLogger readReorderedCounter = null; |
| 240 | + @StatsDoc( |
| 241 | + name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER, |
| 242 | + help = "Counter for number of times DNSResolverDecorator failed to resolve Network Location" |
| 243 | + ) |
243 | 244 | protected Counter failedToResolveNetworkLocationCounter = null; |
| 245 | + @StatsDoc( |
| 246 | + name = NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK, |
| 247 | + help = "Gauge for the number of writable Bookies in default rack" |
| 248 | + ) |
| 249 | + protected Gauge<Integer> numWritableBookiesInDefaultRack; |
244 | 250 |
|
245 | 251 | private String defaultRack = NetworkTopology.DEFAULT_RACK; |
246 | 252 |
|
@@ -282,8 +288,24 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns |
282 | 288 | this.bookiesJoinedCounter = statsLogger.getOpStatsLogger(BOOKIES_JOINED); |
283 | 289 | this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT); |
284 | 290 | this.readReorderedCounter = statsLogger.getOpStatsLogger(READ_REQUESTS_REORDERED); |
285 | | - this.failedToResolveNetworkLocationCounter = statsLogger |
286 | | - .getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER); |
| 291 | + this.failedToResolveNetworkLocationCounter = statsLogger.getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER); |
| 292 | + this.numWritableBookiesInDefaultRack = new Gauge<Integer>() { |
| 293 | + @Override |
| 294 | + public Integer getDefaultValue() { |
| 295 | + return 0; |
| 296 | + } |
| 297 | + |
| 298 | + @Override |
| 299 | + public Integer getSample() { |
| 300 | + rwLock.readLock().lock(); |
| 301 | + try { |
| 302 | + return topology.countNumOfAvailableNodes(getDefaultRack(), Collections.emptySet()); |
| 303 | + } finally { |
| 304 | + rwLock.readLock().unlock(); |
| 305 | + } |
| 306 | + } |
| 307 | + }; |
| 308 | + this.statsLogger.registerGauge(NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK, numWritableBookiesInDefaultRack); |
287 | 309 | this.reorderReadsRandom = reorderReadsRandom; |
288 | 310 | this.stabilizePeriodSeconds = stabilizePeriodSeconds; |
289 | 311 | this.reorderThresholdPendingRequests = reorderThresholdPendingRequests; |
@@ -417,7 +439,9 @@ public void onBookieRackChange(List<BookieSocketAddress> bookieAddressList) { |
417 | 439 | if (node != null) { |
418 | 440 | // refresh the rack info if its a known bookie |
419 | 441 | topology.remove(node); |
420 | | - topology.add(createBookieNode(bookieAddress)); |
| 442 | + BookieNode newNode = createBookieNode(bookieAddress); |
| 443 | + topology.add(newNode); |
| 444 | + knownBookies.put(bookieAddress, newNode); |
421 | 445 | } |
422 | 446 | } |
423 | 447 | } finally { |
@@ -455,6 +479,9 @@ public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writab |
455 | 479 | } |
456 | 480 | } |
457 | 481 |
|
| 482 | + /* |
| 483 | + * this method should be called in writelock scope of 'rwLock' |
| 484 | + */ |
458 | 485 | @Override |
459 | 486 | public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) { |
460 | 487 | for (BookieSocketAddress addr : leftBookies) { |
@@ -483,6 +510,9 @@ public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) { |
483 | 510 | } |
484 | 511 | } |
485 | 512 |
|
| 513 | + /* |
| 514 | + * this method should be called in writelock scope of 'rwLock' |
| 515 | + */ |
486 | 516 | @Override |
487 | 517 | public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) { |
488 | 518 | // node joined |
@@ -531,26 +561,53 @@ private static Set<String> getNetworkLocations(Set<Node> bookieNodes) { |
531 | 561 | return networkLocs; |
532 | 562 | } |
533 | 563 |
|
| 564 | + /* |
| 565 | + * this method should be called in readlock scope of 'rwLock' |
| 566 | + */ |
| 567 | + protected Set<BookieSocketAddress> addDefaultRackBookiesIfMinNumRacksIsEnforced( |
| 568 | + Set<BookieSocketAddress> excludeBookies) { |
| 569 | + Set<BookieSocketAddress> comprehensiveExclusionBookiesSet; |
| 570 | + if (enforceMinNumRacksPerWriteQuorum) { |
| 571 | + Set<BookieSocketAddress> bookiesInDefaultRack = null; |
| 572 | + Set<Node> defaultRackLeaves = topology.getLeaves(getDefaultRack()); |
| 573 | + for (Node node : defaultRackLeaves) { |
| 574 | + if (node instanceof BookieNode) { |
| 575 | + if (bookiesInDefaultRack == null) { |
| 576 | + bookiesInDefaultRack = new HashSet<BookieSocketAddress>(excludeBookies); |
| 577 | + } |
| 578 | + bookiesInDefaultRack.add(((BookieNode) node).getAddr()); |
| 579 | + } else { |
| 580 | + LOG.error("found non-BookieNode: {} as leaf of defaultrack: {}", node, getDefaultRack()); |
| 581 | + } |
| 582 | + } |
| 583 | + if ((bookiesInDefaultRack == null) || bookiesInDefaultRack.isEmpty()) { |
| 584 | + comprehensiveExclusionBookiesSet = excludeBookies; |
| 585 | + } else { |
| 586 | + comprehensiveExclusionBookiesSet = new HashSet<BookieSocketAddress>(excludeBookies); |
| 587 | + comprehensiveExclusionBookiesSet.addAll(bookiesInDefaultRack); |
| 588 | + LOG.info("enforceMinNumRacksPerWriteQuorum is enabled, so Excluding bookies of defaultRack: {}", |
| 589 | + bookiesInDefaultRack); |
| 590 | + } |
| 591 | + } else { |
| 592 | + comprehensiveExclusionBookiesSet = excludeBookies; |
| 593 | + } |
| 594 | + return comprehensiveExclusionBookiesSet; |
| 595 | + } |
| 596 | + |
534 | 597 | @Override |
535 | 598 | public PlacementResult<List<BookieSocketAddress>> newEnsemble(int ensembleSize, int writeQuorumSize, |
536 | 599 | int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) |
537 | 600 | throws BKNotEnoughBookiesException { |
538 | | - return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null); |
539 | | - } |
540 | | - |
541 | | - protected PlacementResult<List<BookieSocketAddress>> newEnsembleInternal(int ensembleSize, |
542 | | - int writeQuorumSize, |
543 | | - Set<BookieSocketAddress> excludeBookies, |
544 | | - Ensemble<BookieNode> parentEnsemble, |
545 | | - Predicate<BookieNode> parentPredicate) |
546 | | - throws BKNotEnoughBookiesException { |
547 | | - return newEnsembleInternal( |
548 | | - ensembleSize, |
549 | | - writeQuorumSize, |
550 | | - writeQuorumSize, |
551 | | - excludeBookies, |
552 | | - parentEnsemble, |
553 | | - parentPredicate); |
| 601 | + rwLock.readLock().lock(); |
| 602 | + try { |
| 603 | + Set<BookieSocketAddress> comprehensiveExclusionBookiesSet = addDefaultRackBookiesIfMinNumRacksIsEnforced( |
| 604 | + excludeBookies); |
| 605 | + PlacementResult<List<BookieSocketAddress>> newEnsembleResult = newEnsembleInternal(ensembleSize, |
| 606 | + writeQuorumSize, ackQuorumSize, comprehensiveExclusionBookiesSet, null, null); |
| 607 | + return newEnsembleResult; |
| 608 | + } finally { |
| 609 | + rwLock.readLock().unlock(); |
| 610 | + } |
554 | 611 | } |
555 | 612 |
|
556 | 613 | @Override |
@@ -643,6 +700,7 @@ public PlacementResult<BookieSocketAddress> replaceBookie(int ensembleSize, int |
643 | 700 | throws BKNotEnoughBookiesException { |
644 | 701 | rwLock.readLock().lock(); |
645 | 702 | try { |
| 703 | + excludeBookies = addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies); |
646 | 704 | excludeBookies.addAll(currentEnsemble); |
647 | 705 | BookieNode bn = knownBookies.get(bookieToReplace); |
648 | 706 | if (null == bn) { |
@@ -1253,28 +1311,31 @@ static void shuffleWithMask(DistributionSchedule.WriteSet writeSet, |
1253 | 1311 | } |
1254 | 1312 | } |
1255 | 1313 |
|
| 1314 | + // this method should be called in readlock scope of 'rwlock' |
1256 | 1315 | @Override |
1257 | 1316 | public boolean isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int writeQuorumSize, |
1258 | 1317 | int ackQuorumSize) { |
1259 | 1318 | int ensembleSize = ensembleList.size(); |
1260 | 1319 | int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum); |
1261 | | - HashSet<String> racksOrRegionsInQuorum = new HashSet<String>(); |
| 1320 | + HashSet<String> racksInQuorum = new HashSet<String>(); |
1262 | 1321 | BookieSocketAddress bookie; |
1263 | 1322 | for (int i = 0; i < ensembleList.size(); i++) { |
1264 | | - racksOrRegionsInQuorum.clear(); |
| 1323 | + racksInQuorum.clear(); |
1265 | 1324 | for (int j = 0; j < writeQuorumSize; j++) { |
1266 | 1325 | bookie = ensembleList.get((i + j) % ensembleSize); |
1267 | 1326 | try { |
1268 | | - racksOrRegionsInQuorum.add(knownBookies.get(bookie).getNetworkLocation()); |
| 1327 | + racksInQuorum.add(knownBookies.get(bookie).getNetworkLocation()); |
1269 | 1328 | } catch (Exception e) { |
1270 | 1329 | /* |
1271 | | - * any issue/exception in analyzing whether ensemble is strictly adhering to |
1272 | | - * placement policy should be swallowed. |
| 1330 | + * any issue/exception in analyzing whether ensemble is |
| 1331 | + * strictly adhering to placement policy should be |
| 1332 | + * swallowed. |
1273 | 1333 | */ |
1274 | 1334 | LOG.warn("Received exception while trying to get network location of bookie: {}", bookie, e); |
1275 | 1335 | } |
1276 | 1336 | } |
1277 | | - if (racksOrRegionsInQuorum.size() < minNumRacksPerWriteQuorumForThisEnsemble) { |
| 1337 | + if ((racksInQuorum.size() < minNumRacksPerWriteQuorumForThisEnsemble) |
| 1338 | + || (enforceMinNumRacksPerWriteQuorum && racksInQuorum.contains(getDefaultRack()))) { |
1278 | 1339 | return false; |
1279 | 1340 | } |
1280 | 1341 | } |
|
0 commit comments