Skip to content

Commit 940b031

Browse files
committed
fix: Sync voting completion to record manager (#24566)
Signed-off-by: Matt Hess <matt.hess@swirldslabs.com>
1 parent b9fdac7 commit 940b031

File tree

5 files changed

+106
-69
lines changed

5 files changed

+106
-69
lines changed

hedera-node/hedera-app/src/main/java/com/hedera/node/app/records/BlockRecordInjectionModule.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.hedera.node.app.quiescence.QuiescenceController;
66
import com.hedera.node.app.records.impl.BlockRecordManagerImpl;
77
import com.hedera.node.app.records.impl.BlockRecordStreamProducer;
8-
import com.hedera.node.app.records.impl.WrappedRecordBlockHashMigration;
98
import com.hedera.node.app.records.impl.WrappedRecordFileBlockHashesDiskWriter;
109
import com.hedera.node.app.records.impl.producers.BlockRecordFormat;
1110
import com.hedera.node.app.records.impl.producers.BlockRecordWriterFactory;
@@ -72,8 +71,7 @@ public static BlockRecordManager provideBlockRecordManager(
7271
@NonNull final QuiescenceController quiescenceController,
7372
@NonNull final QuiescedHeartbeat quiescedHeartbeat,
7473
@NonNull final Platform platform,
75-
@NonNull final WrappedRecordFileBlockHashesDiskWriter wrappedRecordHashesDiskWriter,
76-
@NonNull final WrappedRecordBlockHashMigration wrappedRecordBlockHashMigration) {
74+
@NonNull final WrappedRecordFileBlockHashesDiskWriter wrappedRecordHashesDiskWriter) {
7775
final var merkleState = state.getState();
7876
if (merkleState == null) {
7977
throw new IllegalStateException("Merkle state is null");
@@ -86,8 +84,7 @@ public static BlockRecordManager provideBlockRecordManager(
8684
quiescedHeartbeat,
8785
platform,
8886
wrappedRecordHashesDiskWriter,
89-
initTrigger,
90-
wrappedRecordBlockHashMigration.result());
87+
initTrigger);
9188
}
9289

9390
@Provides

hedera-node/hedera-app/src/main/java/com/hedera/node/app/records/impl/BlockRecordManagerImpl.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,6 @@ public final class BlockRecordManagerImpl implements BlockRecordManager {
125125
* The most recent wrapped record block root hash.
126126
*/
127127
private Bytes previousWrappedRecordBlockRootHash;
128-
/**
129-
* Tracks whether in-memory wrapped hash tracking has been refreshed from finalized BlockInfo
130-
* for the current migration voting cycle.
131-
*/
132-
private boolean wrappedHashStateSyncedFromFinalizedVote;
133128

134129
/**
135130
* Construct BlockRecordManager
@@ -138,8 +133,6 @@ public final class BlockRecordManagerImpl implements BlockRecordManager {
138133
* @param state The current hedera state
139134
* @param streamFileProducer The stream file producer
140135
* @param initTrigger The init trigger
141-
* @param migrationResult The pre-computed migration result from before platform.build(), or null
142-
* if the migration was not run or was skipped
143136
*/
144137
public BlockRecordManagerImpl(
145138
@NonNull final ConfigProvider configProvider,
@@ -149,8 +142,7 @@ public BlockRecordManagerImpl(
149142
@NonNull final QuiescedHeartbeat quiescedHeartbeat,
150143
@NonNull final Platform platform,
151144
@NonNull final WrappedRecordFileBlockHashesDiskWriter wrappedRecordHashesDiskWriter,
152-
@NonNull final InitTrigger initTrigger,
153-
@Nullable final WrappedRecordBlockHashMigration.Result migrationResult) {
145+
@NonNull final InitTrigger initTrigger) {
154146
this.platform = platform;
155147
requireNonNull(state);
156148
this.quiescenceController = requireNonNull(quiescenceController);
@@ -196,14 +188,10 @@ public BlockRecordManagerImpl(
196188
intermediateHashes,
197189
this.lastBlockInfo.wrappedIntermediateBlockRootsLeafCount());
198190
this.previousWrappedRecordBlockRootHash = this.lastBlockInfo.previousWrappedRecordBlockRootHash();
199-
this.wrappedHashStateSyncedFromFinalizedVote = votingCompleteAtStartup;
200191
} else if (initTrigger == InitTrigger.GENESIS) {
201192
// Initialize with empty defaults at genesis
202193
this.prevWrappedRecordBlockHashes = new IncrementalStreamingHasher(sha384DigestOrThrow(), List.of(), 0);
203194
this.previousWrappedRecordBlockRootHash = HASH_OF_ZERO;
204-
this.wrappedHashStateSyncedFromFinalizedVote = false;
205-
} else {
206-
this.wrappedHashStateSyncedFromFinalizedVote = false;
207195
}
208196

209197
// Initialize the stream file producer. NOTE, if the producer cannot be initialized, and a random exception is
@@ -955,8 +943,8 @@ public void syncFinalizedMigrationHashes(
955943
.previousWrappedRecordBlockRootHash(prevWrappedRecordBlockRootHash)
956944
.wrappedIntermediatePreviousBlockRootHashes(intermediateHashes)
957945
.wrappedIntermediateBlockRootsLeafCount(leafCount)
946+
.votingComplete(true)
958947
.build();
959-
this.wrappedHashStateSyncedFromFinalizedVote = true;
960948
logger.info(
961949
"Synced in-memory wrapped hash state from finalized vote: prevHash={}, leafCount={}",
962950
prevWrappedRecordBlockRootHash.toHex(),

hedera-node/hedera-app/src/test/java/com/hedera/node/app/records/impl/BlockOpeningTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,6 @@ private void setupBlockInfo(@NonNull final Instant firstConsTimeOfCurrentBlock)
152152
quiescedHeartbeat,
153153
platform,
154154
wrappedRecordHashesDiskWriter,
155-
InitTrigger.RESTART,
156-
null);
155+
InitTrigger.RESTART);
157156
}
158157
}

hedera-node/hedera-app/src/test/java/com/hedera/node/app/records/impl/BlockRecordManagerImplWrappedRecordFileBlockHashesTest.java

Lines changed: 96 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ void doesNotAppendWhenFeatureFlagDisabled() {
120120
heartbeat,
121121
app.platform(),
122122
diskWriter,
123-
InitTrigger.RECONNECT,
124-
null)) {
123+
InitTrigger.RECONNECT)) {
125124
final var t0 = InstantUtils.instant(10, 1);
126125
mgr.startUserTransaction(t0, state);
127126
mgr.endUserTransaction(Stream.of(sampleTxnRecord(t0, List.of())), state);
@@ -183,8 +182,7 @@ void appendsExpectedHashesWhenFeatureFlagEnabled() throws Exception {
183182
heartbeat,
184183
app.platform(),
185184
diskWriter,
186-
InitTrigger.RECONNECT,
187-
null)) {
185+
InitTrigger.RECONNECT)) {
188186

189187
final var creationTime = new Timestamp(10, 1);
190188
final var t0 = InstantUtils.instant(creationTime.seconds(), creationTime.nanos());
@@ -335,8 +333,7 @@ void doesNotCrashOrAppendIfNoCapturedItemsOnRestart() {
335333
heartbeat,
336334
app.platform(),
337335
diskWriter,
338-
InitTrigger.RECONNECT,
339-
null)) {
336+
InitTrigger.RECONNECT)) {
340337
// Trigger a block boundary immediately; since no endUserTransaction was called, there are no captured
341338
// items.
342339
final var t1 = InstantUtils.instant(13, 1);
@@ -393,8 +390,7 @@ void liveOnlyModeDoesNotCallDiskWriter() {
393390
heartbeat,
394391
app.platform(),
395392
diskWriter,
396-
InitTrigger.RECONNECT,
397-
null)) {
393+
InitTrigger.RECONNECT)) {
398394
final var t0 = InstantUtils.instant(10, 1);
399395
mgr.startUserTransaction(t0, state);
400396
mgr.endUserTransaction(Stream.of(sampleTxnRecord(t0, List.of())), state);
@@ -456,8 +452,7 @@ void liveModeQueuesWrappedHashesWhileVotingPending() {
456452
heartbeat,
457453
app.platform(),
458454
diskWriter,
459-
InitTrigger.RECONNECT,
460-
null)) {
455+
InitTrigger.RECONNECT)) {
461456
final var t0 = InstantUtils.instant(10, 1);
462457
mgr.startUserTransaction(t0, state);
463458
mgr.endUserTransaction(Stream.of(sampleTxnRecord(t0, List.of())), state);
@@ -521,8 +516,7 @@ void liveModeDoesNotQueueWrappedHashesAfterVotingDeadlineReached() {
521516
heartbeat,
522517
app.platform(),
523518
diskWriter,
524-
InitTrigger.RECONNECT,
525-
null)) {
519+
InitTrigger.RECONNECT)) {
526520
final var t0 = InstantUtils.instant(10, 1);
527521
mgr.startUserTransaction(t0, state);
528522
mgr.endUserTransaction(Stream.of(sampleTxnRecord(t0, List.of())), state);
@@ -592,8 +586,7 @@ void liveModeDoesNotQueueWrappedHashesAfterVotingComplete() {
592586
heartbeat,
593587
app.platform(),
594588
diskWriter,
595-
InitTrigger.RECONNECT,
596-
null)) {
589+
InitTrigger.RECONNECT)) {
597590
final var t0 = InstantUtils.instant(10, 1);
598591
mgr.startUserTransaction(t0, state);
599592
mgr.endUserTransaction(Stream.of(sampleTxnRecord(t0, List.of())), state);
@@ -657,8 +650,7 @@ void liveAndDiskModeCallsDiskWriter() {
657650
heartbeat,
658651
app.platform(),
659652
diskWriter,
660-
InitTrigger.RECONNECT,
661-
null)) {
653+
InitTrigger.RECONNECT)) {
662654
final var t0 = InstantUtils.instant(10, 1);
663655
mgr.startUserTransaction(t0, state);
664656
mgr.endUserTransaction(Stream.of(sampleTxnRecord(t0, List.of())), state);
@@ -718,8 +710,7 @@ void liveModeLeavesBlockInfoEmptyWhenNoItems() {
718710
heartbeat,
719711
app.platform(),
720712
diskWriter,
721-
InitTrigger.RECONNECT,
722-
null)) {
713+
InitTrigger.RECONNECT)) {
723714
// Trigger a block boundary without any endUserTransaction calls (empty items)
724715
final var t1 = InstantUtils.instant(13, 1);
725716
mgr.startUserTransaction(t1, state);
@@ -801,8 +792,7 @@ void constructorSeedsFromBlockInfoEvenWithMigrationResult() throws Exception {
801792
heartbeat,
802793
app.platform(),
803794
diskWriter,
804-
InitTrigger.RESTART,
805-
migrationResult)) {
795+
InitTrigger.RESTART)) {
806796
// Drive a block boundary: start block 0 (EPOCH path), add items, cross period
807797
final var t0 = InstantUtils.instant(10, 1);
808798
mgr.startUserTransaction(t0, state);
@@ -889,8 +879,7 @@ void constructorSeedsFromBlockInfoState() throws Exception {
889879
heartbeat,
890880
app.platform(),
891881
diskWriter,
892-
InitTrigger.RESTART,
893-
null)) {
882+
InitTrigger.RESTART)) {
894883
// First boundary: freeze-restart with null currentBlockStartRunningHash (preserves state)
895884
final var t0 = InstantUtils.instant(200, 0);
896885
mgr.startUserTransaction(t0, state);
@@ -958,8 +947,7 @@ void freezeBlockPersistsWrappedHashStateToBlockInfo() {
958947
heartbeat,
959948
app.platform(),
960949
diskWriter,
961-
InitTrigger.RECONNECT,
962-
null)) {
950+
InitTrigger.RECONNECT)) {
963951
// Open block 0 via EPOCH path
964952
final var t0 = InstantUtils.instant(10, 1);
965953
mgr.startUserTransaction(t0, state);
@@ -1025,8 +1013,7 @@ void freezeBlockQueuesWrappedHashesWhileVotingPending() {
10251013
heartbeat,
10261014
app.platform(),
10271015
diskWriter,
1028-
InitTrigger.RECONNECT,
1029-
null)) {
1016+
InitTrigger.RECONNECT)) {
10301017
final var t0 = InstantUtils.instant(10, 1);
10311018
mgr.startUserTransaction(t0, state);
10321019
mgr.endUserTransaction(Stream.of(sampleTxnRecord(t0, List.of())), state);
@@ -1088,8 +1075,7 @@ void freezeBlockToDiskReturnsWhenDiskFlagDisabled() {
10881075
heartbeat,
10891076
app.platform(),
10901077
diskWriter,
1091-
InitTrigger.RECONNECT,
1092-
null)) {
1078+
InitTrigger.RECONNECT)) {
10931079
mgr.writeFreezeBlockWrappedRecordFileBlockHashesToDisk(state);
10941080
}
10951081

@@ -1143,8 +1129,7 @@ void freezeBlockToDiskAppendsWhenDiskFlagEnabled() {
11431129
heartbeat,
11441130
app.platform(),
11451131
diskWriter,
1146-
InitTrigger.RECONNECT,
1147-
null)) {
1132+
InitTrigger.RECONNECT)) {
11481133
final var t0 = InstantUtils.instant(10, 1);
11491134
mgr.startUserTransaction(t0, state);
11501135
mgr.writeFreezeBlockWrappedRecordFileBlockHashesToDisk(state);
@@ -1206,8 +1191,7 @@ void syncFinalizedMigrationHashesSeedsFreezePersistenceWhenLiveWriteEnabled() {
12061191
heartbeat,
12071192
app.platform(),
12081193
diskWriter,
1209-
InitTrigger.RESTART,
1210-
null)) {
1194+
InitTrigger.RESTART)) {
12111195
mgr.syncFinalizedMigrationHashes(syncedPrevHash, syncedIntermediate, 1);
12121196
// Freeze persistence should use the synced in-memory wrapped hash state.
12131197
mgr.writeFreezeBlockWrappedRecordFileBlockHashesToState(state);
@@ -1221,6 +1205,83 @@ void syncFinalizedMigrationHashesSeedsFreezePersistenceWhenLiveWriteEnabled() {
12211205
assertEquals(1, blockInfo.wrappedIntermediateBlockRootsLeafCount());
12221206
}
12231207

1208+
@Test
1209+
void syncFinalizedMigrationHashesPropagatesVotingCompleteAcrossBlockBoundary() {
1210+
final var app = appBuilder()
1211+
.withService(new BlockRecordService())
1212+
.withService(new PlatformStateService())
1213+
.withConfigValue("hedera.recordStream.liveWritePrevWrappedRecordHashes", true)
1214+
.build();
1215+
1216+
// State begins with votingComplete = false, i.e. prior to vote finalization.
1217+
final var initialTs = new Timestamp(100, 0);
1218+
app.stateMutator(BlockRecordService.NAME)
1219+
.withSingletonState(
1220+
BLOCKS_STATE_ID,
1221+
BlockInfo.newBuilder()
1222+
.lastBlockNumber(5)
1223+
.firstConsTimeOfLastBlock(new Timestamp(98, 0))
1224+
.blockHashes(Bytes.wrap(new byte[BlockRecordInfoUtils.HASH_SIZE]))
1225+
.consTimeOfLastHandledTxn(initialTs)
1226+
.migrationRecordsStreamed(true)
1227+
.firstConsTimeOfCurrentBlock(initialTs)
1228+
.lastUsedConsTime(initialTs)
1229+
.lastIntervalProcessTime(initialTs)
1230+
.votingComplete(false)
1231+
.build())
1232+
.withSingletonState(
1233+
RUNNING_HASHES_STATE_ID,
1234+
RunningHashes.newBuilder()
1235+
.runningHash(Bytes.wrap(new byte[48]))
1236+
.build())
1237+
.commit();
1238+
app.stateMutator(PlatformStateService.NAME)
1239+
.withSingletonState(V0540PlatformStateSchema.PLATFORM_STATE_STATE_ID, PlatformState.DEFAULT)
1240+
.commit();
1241+
1242+
final var state = requireNonNullState(app.workingStateAccessor().getState());
1243+
final var producer = new FakeStreamProducer();
1244+
final var controller = new QuiescenceController(
1245+
new QuiescenceConfig(false, Duration.ofSeconds(5)), InstantSource.system(), () -> 0);
1246+
final var heartbeat = new QuiescedHeartbeat(controller, app.platform());
1247+
final var diskWriter = mock(WrappedRecordFileBlockHashesDiskWriter.class);
1248+
final var syncedPrevHash = Bytes.wrap(new byte[] {
1249+
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
1250+
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7
1251+
});
1252+
final var syncedIntermediate = List.of(Bytes.wrap(new byte[48]));
1253+
try (final var mgr = new BlockRecordManagerImpl(
1254+
app.configProvider(),
1255+
state,
1256+
producer,
1257+
controller,
1258+
heartbeat,
1259+
app.platform(),
1260+
diskWriter,
1261+
InitTrigger.RESTART)) {
1262+
// Open first block
1263+
final var t0 = InstantUtils.instant(200, 1);
1264+
mgr.startUserTransaction(t0, state);
1265+
1266+
// Simulate vote finalization
1267+
mgr.syncFinalizedMigrationHashes(syncedPrevHash, syncedIntermediate, 1);
1268+
1269+
// Add items and cross current block boundary, causing latest block info write to state
1270+
mgr.endUserTransaction(Stream.of(sampleTxnRecord(t0, List.of())), state);
1271+
final var t1 = InstantUtils.instant(204, 1);
1272+
mgr.startUserTransaction(t1, state);
1273+
1274+
// Simulate freeze
1275+
mgr.writeFreezeBlockWrappedRecordFileBlockHashesToState(state);
1276+
}
1277+
1278+
final var blockInfo = state.getWritableStates(BlockRecordService.NAME)
1279+
.<BlockInfo>getSingleton(BLOCKS_STATE_ID)
1280+
.get();
1281+
// Verify voting completion was recorded
1282+
assertTrue(requireNonNull(blockInfo).votingComplete());
1283+
}
1284+
12241285
@Test
12251286
void syncFinalizedMigrationHashesIsNoopWhenLiveWriteDisabled() {
12261287
final var app = appBuilder()
@@ -1273,8 +1334,7 @@ void syncFinalizedMigrationHashesIsNoopWhenLiveWriteDisabled() {
12731334
heartbeat,
12741335
app.platform(),
12751336
diskWriter,
1276-
InitTrigger.RESTART,
1277-
null)) {
1337+
InitTrigger.RESTART)) {
12781338
mgr.syncFinalizedMigrationHashes(syncedPrevHash, List.of(Bytes.wrap(new byte[48])), 1);
12791339
mgr.writeFreezeBlockWrappedRecordFileBlockHashesToState(state);
12801340
}
@@ -1355,8 +1415,7 @@ void restartFirstBoundaryPreservesRestoredHasherState() throws Exception {
13551415
heartbeat,
13561416
app.platform(),
13571417
diskWriter,
1358-
InitTrigger.RESTART,
1359-
null)) {
1418+
InitTrigger.RESTART)) {
13601419
// First boundary after restart: freeze-restart with null currentBlockStartRunningHash
13611420
final var t0 = InstantUtils.instant(200, 0);
13621421
mgr.startUserTransaction(t0, state);
@@ -1419,8 +1478,7 @@ void liveModeBlockBoundaryWritesNonDefaultWrappedHashToBlockInfo() {
14191478
heartbeat,
14201479
app.platform(),
14211480
diskWriter,
1422-
InitTrigger.RECONNECT,
1423-
null)) {
1481+
InitTrigger.RECONNECT)) {
14241482
// Open block 0 (EPOCH path), add items, cross period
14251483
final var t0 = InstantUtils.instant(10, 1);
14261484
mgr.startUserTransaction(t0, state);

0 commit comments

Comments
 (0)