Skip to content

Commit 7681f88

Browse files
committed
HDDS-14572. ReInit DeletedBlocksTransactionSummary after SCM leader transfer
1 parent d308641 commit 7681f88

File tree

3 files changed

+67
-19
lines changed

3 files changed

+67
-19
lines changed

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,14 @@ public void clear() {
424424
scmDeleteBlocksCommandStatusManager.clear();
425425
transactionToDNsCommitMap.clear();
426426
txSizeMap.clear();
427+
try {
428+
initDataDistributionData();
429+
} catch (IOException e) {
430+
LOG.warn("Failed to initialize Storage space distribution data. The feature will continue with current " +
431+
"totalBlockCount {}, totalBlockCount {}, totalBlocksSize {} and totalReplicatedBlocksSize {}. " +
432+
"There is a high chance that the real data and current data has a fixed gap.",
433+
totalBlockCount.get(), totalBlocksSize.get(), totalBlocksSize.get(), totalReplicatedBlocksSize.get());
434+
}
427435
}
428436

429437
public void cleanAllTimeoutSCMCommand(long timeoutMs) {
@@ -672,8 +680,9 @@ private void initDataDistributionData() throws IOException {
672680
totalBlockCount.set(summary.getTotalBlockCount());
673681
totalBlocksSize.set(summary.getTotalBlockSize());
674682
totalReplicatedBlocksSize.set(summary.getTotalBlockReplicatedSize());
675-
LOG.info("Data distribution is enabled with totalBlockCount {} totalBlocksSize {}",
676-
totalBlockCount.get(), totalBlocksSize.get());
683+
LOG.info("Storage space distribution is initialized with totalTxCount {}, totalBlockCount {}, " +
684+
"totalBlocksSize {} and totalReplicatedBlocksSize {}", totalTxCount.get(),
685+
totalBlockCount.get(), totalBlocksSize.get(), totalReplicatedBlocksSize.get());
677686
}
678687
}
679688

@@ -688,8 +697,7 @@ private DeletedBlocksTransactionSummary loadDeletedBlocksSummary() throws IOExce
688697
}
689698
return DeletedBlocksTransactionSummary.parseFrom(byteString);
690699
} catch (IOException e) {
691-
LOG.error("Failed to get property {} for service {}. DataDistribution function will be disabled.",
692-
propertyName, SERVICE_NAME, e);
700+
LOG.error("Failed to get property {} for service {}.", propertyName, SERVICE_NAME, e);
693701
throw new IOException("Failed to get property " + propertyName, e);
694702
}
695703
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,12 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
296296
currentLeaderTerm.get());
297297
scm.getSequenceIdGen().invalidateBatch();
298298

299+
try {
300+
transactionBuffer.flush();
301+
} catch (Exception ex) {
302+
ExitUtils.terminate(1, "Failed to flush transactionBuffer", ex, StateMachine.LOG);
303+
}
304+
299305
DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
300306
.getDeletedBlockLog();
301307
Preconditions.checkArgument(

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmDataDistributionFinalization.java

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
package org.apache.hadoop.hdds.upgrade;
1919

2020
import static java.nio.charset.StandardCharsets.UTF_8;
21+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
22+
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
23+
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
24+
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
25+
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
2126
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
2227
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
2328
import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
@@ -109,12 +114,14 @@ public void init(OzoneConfiguration conf,
109114
configurator.setUpgradeFinalizationExecutor(executor);
110115

111116
conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY, HDDSLayoutFeature.HBASE_SUPPORT.layoutVersion());
112-
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
113-
TimeUnit.MILLISECONDS);
114-
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
115-
TimeUnit.MILLISECONDS);
116-
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
117-
100, TimeUnit.MILLISECONDS);
117+
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS);
118+
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS);
119+
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, TimeUnit.MILLISECONDS);
120+
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 100, TimeUnit.MILLISECONDS);
121+
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100, MILLISECONDS);
122+
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100, TimeUnit.MILLISECONDS);
123+
conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 100, TimeUnit.MILLISECONDS);
124+
118125
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
119126
scmConfig.setBlockDeletionInterval(Duration.ofMillis(100));
120127
conf.setFromObject(scmConfig);
@@ -123,6 +130,7 @@ public void init(OzoneConfiguration conf,
123130
DatanodeConfiguration dnConf =
124131
conf.getObject(DatanodeConfiguration.class);
125132
dnConf.setBlockDeletionInterval(Duration.ofMillis(100));
133+
dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100));
126134
conf.setFromObject(dnConf);
127135

128136
MiniOzoneHAClusterImpl.Builder clusterBuilder = MiniOzoneCluster.newHABuilder(conf);
@@ -344,13 +352,39 @@ public void testFinalizationNonEmptyClusterDataDistribution() throws Exception {
344352
assertEquals(value.getBytes(UTF_8).length, summary.getTotalBlockSize());
345353
assertEquals(value.getBytes(UTF_8).length * 3, summary.getTotalBlockReplicatedSize());
346354

355+
// transfer SCM leader
356+
String newLeaderScmId = null;
357+
for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) {
358+
if (scm != activeSCM) {
359+
newLeaderScmId = scm.getScmId();
360+
break;
361+
}
362+
}
363+
cluster.getStorageContainerLocationClient().transferLeadership(newLeaderScmId);
364+
StorageContainerManager newActiveSCM = cluster.getActiveSCM();
365+
deletedBlockLog = (DeletedBlockLogImpl) newActiveSCM.getScmBlockManager().getDeletedBlockLog();
366+
SCMDeletedBlockTransactionStatusManager newStatusManager =
367+
deletedBlockLog.getSCMDeletedBlockTransactionStatusManager();
368+
// new leader SCM should have the right deletion tx summary
369+
summary = newStatusManager.getTransactionSummary();
370+
assertEquals(1, summary.getTotalTransactionCount());
371+
assertEquals(1, summary.getTotalBlockCount());
372+
assertEquals(value.getBytes(UTF_8).length, summary.getTotalBlockSize());
373+
assertEquals(value.getBytes(UTF_8).length * 3, summary.getTotalBlockReplicatedSize());
374+
375+
// flush buffer and start SCMBlockDeletingService
376+
for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) {
377+
flushDBTransactionBuffer(scm);
378+
scm.getScmBlockManager().getSCMBlockDeletingService().start();
379+
}
380+
347381
// force close the container so that block can be deleted
348-
activeSCM.getClientProtocolServer().closeContainer(
382+
newActiveSCM.getClientProtocolServer().closeContainer(
349383
keyDetails.getOzoneKeyLocations().get(0).getContainerID());
350384
// wait for container to be closed
351385
GenericTestUtils.waitFor(() -> {
352386
try {
353-
return activeSCM.getClientProtocolServer().getContainer(
387+
return newActiveSCM.getClientProtocolServer().getContainer(
354388
keyDetails.getOzoneKeyLocations().get(0).getContainerID())
355389
.getState() == HddsProtos.LifeCycleState.CLOSED;
356390
} catch (IOException e) {
@@ -359,15 +393,15 @@ public void testFinalizationNonEmptyClusterDataDistribution() throws Exception {
359393
}
360394
}, 100, 5000);
361395

362-
// flush buffer and start SCMBlockDeletingService
363-
for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) {
364-
flushDBTransactionBuffer(scm);
365-
scm.getScmBlockManager().getSCMBlockDeletingService().start();
366-
}
367-
368396
// wait for block deletion transactions to be confirmed by DN
369397
GenericTestUtils.waitFor(
370-
() -> statusManager.getTransactionSummary().getTotalTransactionCount() == 0, 100, 30000);
398+
() -> newStatusManager.getTransactionSummary().getTotalTransactionCount() == 0, 100, 30000);
399+
400+
// transfer leader back to old SCM and verify
401+
cluster.getStorageContainerLocationClient().transferLeadership(activeSCM.getScmId());
402+
deletedBlockLog = (DeletedBlockLogImpl) activeSCM.getScmBlockManager().getDeletedBlockLog();
403+
summary = deletedBlockLog.getSCMDeletedBlockTransactionStatusManager().getTransactionSummary();
404+
assertEquals(EMPTY_SUMMARY, summary);
371405
}
372406

373407
private Map<Long, List<DeletedBlock>> generateDeletedBlocks(int dataSize, boolean withSize) {

0 commit comments

Comments
 (0)