Skip to content

Commit 336daf4

Browse files
committed
sync lastQuiescenceCommand when heartbeat breaks quiescence
Signed-off-by: Alex Kehayov <aleks.kehayov@limechain.tech>
1 parent c44696b commit 336daf4

File tree

6 files changed

+69
-28
lines changed

6 files changed

+69
-28
lines changed

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,8 @@ public boolean endRound(@NonNull final State state, final long roundNum) {
636636
blockStreamConfig.maxConsecutiveScheduleSecondsToProbe(),
637637
config.getConfigData(StakingConfig.class)
638638
.periodMins(),
639-
state));
639+
state),
640+
lastQuiescenceCommand);
640641
}
641642
}
642643
}

hedera-node/hedera-app/src/main/java/com/hedera/node/app/quiescence/QuiescedHeartbeat.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.concurrent.ScheduledExecutorService;
1616
import java.util.concurrent.ScheduledFuture;
1717
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.atomic.AtomicReference;
1819
import javax.inject.Inject;
1920
import javax.inject.Singleton;
2021
import org.apache.logging.log4j.LogManager;
@@ -69,8 +70,16 @@ public QuiescedHeartbeat(@NonNull final QuiescenceController controller, Platfor
6970
/**
7071
* Schedules a heartbeat at the given interval that will last until the {@link QuiescenceController} reports a
7172
* status other than {@link QuiescenceCommand#QUIESCE}.
73+
*
74+
* @param heartbeatInterval the interval between heartbeats
75+
* @param probe the TCT probe to use
76+
* @param lastQuiescenceCommand if non-null, updated when the heartbeat breaks quiescence so that
77+
* {@link BlockStreamManagerImpl} stays in sync with the command sent to the platform
7278
*/
73-
public void start(@NonNull final Duration heartbeatInterval, @NonNull final TctProbe probe) {
79+
public void start(
80+
@NonNull final Duration heartbeatInterval,
81+
@NonNull final TctProbe probe,
82+
@Nullable final AtomicReference<QuiescenceCommand> lastQuiescenceCommand) {
7483
requireNonNull(heartbeatInterval);
7584
requireNonNull(probe);
7685

@@ -81,7 +90,7 @@ public void start(@NonNull final Duration heartbeatInterval, @NonNull final TctP
8190
heartbeatFuture = scheduler.scheduleAtFixedRate(
8291
() -> {
8392
try {
84-
heartbeat(probe);
93+
heartbeat(probe, lastQuiescenceCommand);
8594
} catch (Exception e) {
8695
log.warn("Unhandled exception in quiesced heartbeat", e);
8796
}
@@ -114,7 +123,8 @@ public void shutdown() {
114123
/**
115124
* The heartbeat task that probes for the TCT and updates the controller.
116125
*/
117-
private void heartbeat(@NonNull final TctProbe probe) {
126+
private void heartbeat(
127+
@NonNull final TctProbe probe, @Nullable final AtomicReference<QuiescenceCommand> lastQuiescenceCommand) {
118128
try {
119129
// Probe for the TCT
120130
final var tct = probe.findTct();
@@ -126,11 +136,17 @@ private void heartbeat(@NonNull final TctProbe probe) {
126136
// Check if we should continue running
127137
if (commandNow != QUIESCE) {
128138
log.info("Stopping quiescence heartbeat ({})", commandNow);
139+
if (lastQuiescenceCommand != null) {
140+
lastQuiescenceCommand.set(commandNow);
141+
}
129142
platform.quiescenceCommand(commandNow);
130143
stop();
131144
}
132145
} catch (final Exception e) {
133146
// End quiescence and stop the heartbeat to avoid log spam from repeated failures
147+
if (lastQuiescenceCommand != null) {
148+
lastQuiescenceCommand.set(DONT_QUIESCE);
149+
}
134150
platform.quiescenceCommand(DONT_QUIESCE);
135151
stop();
136152
throw e;

hedera-node/hedera-app/src/main/java/com/hedera/node/app/records/impl/BlockRecordManagerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,8 @@ public void maybeQuiesce(@NonNull final State state) {
547547
new TctProbe(
548548
blockStreamConfig.maxConsecutiveScheduleSecondsToProbe(),
549549
config.getConfigData(StakingConfig.class).periodMins(),
550-
state));
550+
state),
551+
lastQuiescenceCommand);
551552
}
552553
}
553554
}

hedera-node/hedera-app/src/test/java/com/hedera/node/app/quiescence/QuiescedHeartbeatTest.java

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static org.hiero.consensus.model.quiescence.QuiescenceCommand.DONT_QUIESCE;
66
import static org.hiero.consensus.model.quiescence.QuiescenceCommand.QUIESCE;
77
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
89
import static org.mockito.ArgumentMatchers.any;
910
import static org.mockito.ArgumentMatchers.anyLong;
1011
import static org.mockito.ArgumentMatchers.eq;
@@ -21,6 +22,7 @@
2122
import java.util.concurrent.ScheduledExecutorService;
2223
import java.util.concurrent.ScheduledFuture;
2324
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
2426
import org.junit.jupiter.api.BeforeEach;
2527
import org.junit.jupiter.api.Test;
2628
import org.junit.jupiter.api.extension.ExtendWith;
@@ -69,7 +71,7 @@ void startSchedulesHeartbeatAtFixedRate() {
6971
.willReturn(scheduledFuture);
7072

7173
// When
72-
subject.start(interval, probe);
74+
subject.start(interval, probe, null);
7375

7476
// Then
7577
verify(scheduler).scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(5000L), eq(TimeUnit.MILLISECONDS));
@@ -83,10 +85,10 @@ void startCancelsExistingHeartbeatBeforeSchedulingNew() {
8385
.willReturn(scheduledFuture);
8486

8587
// Start first heartbeat
86-
subject.start(interval, probe);
88+
subject.start(interval, probe, null);
8789

8890
// When - start second heartbeat
89-
subject.start(interval, probe);
91+
subject.start(interval, probe, null);
9092

9193
// Then - should have cancelled the first one
9294
verify(scheduledFuture).cancel(false);
@@ -99,7 +101,7 @@ void stopCancelsHeartbeatWhenRunning() {
99101
final var interval = Duration.ofSeconds(2);
100102
given(scheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
101103
.willReturn(scheduledFuture);
102-
subject.start(interval, probe);
104+
subject.start(interval, probe, null);
103105

104106
// When
105107
subject.stop();
@@ -120,7 +122,7 @@ void stopCanBeCalledMultipleTimes() {
120122
final var interval = Duration.ofSeconds(2);
121123
given(scheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
122124
.willReturn(scheduledFuture);
123-
subject.start(interval, probe);
125+
subject.start(interval, probe, null);
124126

125127
// When
126128
subject.stop();
@@ -136,7 +138,7 @@ void shutdownStopsHeartbeatAndShutsDownScheduler() {
136138
final var interval = Duration.ofSeconds(2);
137139
given(scheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
138140
.willReturn(scheduledFuture);
139-
subject.start(interval, probe);
141+
subject.start(interval, probe, null);
140142

141143
// When
142144
subject.shutdown();
@@ -165,7 +167,7 @@ void heartbeatProbesForTctAndSetsItOnController() {
165167
.willReturn(scheduledFuture);
166168

167169
// When
168-
subject.start(Duration.ofSeconds(1), probe);
170+
subject.start(Duration.ofSeconds(1), probe, null);
169171

170172
// Capture and execute the heartbeat runnable
171173
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -188,7 +190,7 @@ void heartbeatDoesNotSetTctWhenProbeReturnsNull() {
188190
.willReturn(scheduledFuture);
189191

190192
// When
191-
subject.start(Duration.ofSeconds(1), probe);
193+
subject.start(Duration.ofSeconds(1), probe, null);
192194

193195
// Capture and execute the heartbeat runnable
194196
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -212,7 +214,7 @@ void heartbeatStopsWhenQuiescenceStatusChangesToDontQuiesce() {
212214
.willReturn(scheduledFuture);
213215

214216
// When
215-
subject.start(Duration.ofSeconds(1), probe);
217+
subject.start(Duration.ofSeconds(1), probe, null);
216218

217219
// Capture and execute the heartbeat runnable
218220
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -227,6 +229,27 @@ void heartbeatStopsWhenQuiescenceStatusChangesToDontQuiesce() {
227229
verify(scheduledFuture).cancel(false);
228230
}
229231

232+
@Test
233+
void heartbeatUpdatesLastQuiescenceCommandWhenBreakingQuiescence() {
234+
// Given
235+
final var tct = Instant.ofEpochSecond(1_000_000L);
236+
final var lastCommand = new AtomicReference<>(QUIESCE);
237+
given(probe.findTct()).willReturn(tct);
238+
given(controller.getQuiescenceStatus()).willReturn(DONT_QUIESCE);
239+
given(scheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
240+
.willReturn(scheduledFuture);
241+
242+
// When
243+
subject.start(Duration.ofSeconds(1), probe, lastCommand);
244+
245+
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
246+
verify(scheduler).scheduleAtFixedRate(runnableCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
247+
runnableCaptor.getValue().run();
248+
249+
// Then - lastQuiescenceCommand should be updated to match what was sent to the platform
250+
assertEquals(DONT_QUIESCE, lastCommand.get());
251+
}
252+
230253
@Test
231254
void heartbeatStopsWhenQuiescenceStatusChangesToBreakQuiescence() {
232255
// Given
@@ -237,7 +260,7 @@ void heartbeatStopsWhenQuiescenceStatusChangesToBreakQuiescence() {
237260
.willReturn(scheduledFuture);
238261

239262
// When
240-
subject.start(Duration.ofSeconds(1), probe);
263+
subject.start(Duration.ofSeconds(1), probe, null);
241264

242265
// Capture and execute the heartbeat runnable
243266
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -262,7 +285,7 @@ void heartbeatContinuesWhenQuiescenceStatusRemainsQuiesce() {
262285
.willReturn(scheduledFuture);
263286

264287
// When
265-
subject.start(Duration.ofSeconds(1), probe);
288+
subject.start(Duration.ofSeconds(1), probe, null);
266289

267290
// Capture and execute the heartbeat runnable
268291
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -286,7 +309,7 @@ void heartbeatSendsCommandToPlatformBeforeStopping() {
286309
.willReturn(scheduledFuture);
287310

288311
// When
289-
subject.start(Duration.ofSeconds(1), probe);
312+
subject.start(Duration.ofSeconds(1), probe, null);
290313

291314
// Capture and execute the heartbeat runnable
292315
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -308,7 +331,7 @@ void heartbeatHandlesExceptionFromProbeByEndingQuiescenceAndStopping() {
308331
.willReturn(scheduledFuture);
309332

310333
// When
311-
subject.start(Duration.ofSeconds(1), probe);
334+
subject.start(Duration.ofSeconds(1), probe, null);
312335

313336
// Capture and execute the heartbeat runnable
314337
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -335,7 +358,7 @@ void heartbeatHandlesExceptionFromControllerSetTctByEndingQuiescenceAndStopping(
335358
.willReturn(scheduledFuture);
336359

337360
// When
338-
subject.start(Duration.ofSeconds(1), probe);
361+
subject.start(Duration.ofSeconds(1), probe, null);
339362

340363
// Capture and execute the heartbeat runnable
341364
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -361,7 +384,7 @@ void heartbeatHandlesExceptionFromControllerGetStatusByEndingQuiescenceAndStoppi
361384
.willReturn(scheduledFuture);
362385

363386
// When
364-
subject.start(Duration.ofSeconds(1), probe);
387+
subject.start(Duration.ofSeconds(1), probe, null);
365388

366389
// Capture and execute the heartbeat runnable
367390
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -387,7 +410,7 @@ void heartbeatHandlesExceptionFromPlatformQuiescenceCommandByEndingQuiescenceAnd
387410
.willReturn(scheduledFuture);
388411

389412
// When
390-
subject.start(Duration.ofSeconds(1), probe);
413+
subject.start(Duration.ofSeconds(1), probe, null);
391414

392415
// Capture and execute the heartbeat runnable
393416
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -413,7 +436,7 @@ void multipleHeartbeatExecutionsWorkCorrectly() {
413436
.willReturn(scheduledFuture);
414437

415438
// When
416-
subject.start(Duration.ofSeconds(1), probe);
439+
subject.start(Duration.ofSeconds(1), probe, null);
417440

418441
// Capture and execute the heartbeat runnable twice
419442
final var runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -439,13 +462,13 @@ void heartbeatWithDifferentIntervals() {
439462
.willReturn(scheduledFuture);
440463

441464
// When - start with short interval
442-
subject.start(shortInterval, probe);
465+
subject.start(shortInterval, probe, null);
443466

444467
// Then
445468
verify(scheduler).scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(100L), eq(TimeUnit.MILLISECONDS));
446469

447470
// When - start with long interval
448-
subject.start(longInterval, probe);
471+
subject.start(longInterval, probe, null);
449472

450473
// Then
451474
verify(scheduler).scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(60000L), eq(TimeUnit.MILLISECONDS));
@@ -459,7 +482,7 @@ void startWithZeroDurationInterval() {
459482
.willReturn(scheduledFuture);
460483

461484
// When
462-
subject.start(zeroDuration, probe);
485+
subject.start(zeroDuration, probe, null);
463486

464487
// Then
465488
verify(scheduler).scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(0L), eq(TimeUnit.MILLISECONDS));

hedera-node/hedera-app/src/test/java/com/hedera/node/app/records/impl/BlockOpeningTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void maybeQuiesceStartsHeartbeatOnQuiesceCommandChange() {
119119
subject.maybeQuiesce(state);
120120

121121
verify(platform, times(1)).quiescenceCommand(QUIESCE);
122-
verify(quiescedHeartbeat, times(1)).start(any(), any());
122+
verify(quiescedHeartbeat, times(1)).start(any(), any(), any());
123123
}
124124

125125
@Test
@@ -130,7 +130,7 @@ void maybeQuiesceDoesNothingWhenCommandRemainsDontQuiesce() {
130130
subject.maybeQuiesce(state);
131131

132132
verify(platform, never()).quiescenceCommand(any());
133-
verify(quiescedHeartbeat, never()).start(any(), any());
133+
verify(quiescedHeartbeat, never()).start(any(), any(), any());
134134
}
135135

136136
private void setupBlockInfo(@NonNull final Instant firstConsTimeOfCurrentBlock) {

hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/QuiescenceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
*/
1515
@ConfigData("quiescence")
1616
public record QuiescenceConfig(
17-
@ConfigProperty(defaultValue = "false") boolean enabled,
17+
@ConfigProperty(defaultValue = "true") boolean enabled,
1818
@ConfigProperty(defaultValue = "5s") Duration tctDuration) {}

0 commit comments

Comments
 (0)