Skip to content

Conversation

@xin-hedera
Copy link
Contributor

@xin-hedera xin-hedera commented Jul 18, 2025

Description:

This PR implements latency based block node scheduling

  • Add three block node schedulers: LATENCY, PRIORITY, and PRIORITY_THEN_LATENCY
  • Add LatencyService to measure block node streaming latency in background for latency aware schedulers

Related issue(s):

Fixes #11271
Fixes #11546

Notes for reviewer:

Checklist

  • Documented (Code comments, README, etc.)
  • Tested (unit, integration, etc.)

@xin-hedera xin-hedera self-assigned this Jul 18, 2025
@xin-hedera xin-hedera linked an issue Jul 18, 2025 that may be closed by this pull request
@xin-hedera xin-hedera added the enhancement Type: New feature label Jul 18, 2025
@xin-hedera xin-hedera added this to the 0.135.0 milestone Jul 18, 2025
@lfdt-bot
Copy link

lfdt-bot commented Jul 18, 2025

Snyk checks have passed. No issues have been found so far.

Status Scanner Critical High Medium Low Total (0)
Open Source Security 0 0 0 0 0 issues

💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse.

@codacy-production
Copy link

codacy-production bot commented Jul 18, 2025

Coverage summary from Codacy

See diff coverage on Codacy

Coverage variation Diff coverage
-39.07% (target: -1.00%) 97.62%
Coverage variation details
Coverable lines Covered lines Coverage
Common ancestor commit (64ab944) 27275 25606 93.88%
Head commit (48110dc) 52936 (+25661) 29015 (+3409) 54.81% (-39.07%)

Coverage variation is the difference between the coverage for the head and common ancestor commits of the pull request branch: <coverage of head commit> - <coverage of common ancestor commit>

Diff coverage details
Coverable lines Covered lines Diff coverage
Pull request (#11607) 252 246 97.62%

Diff coverage is the percentage of lines that are covered by tests out of the coverable lines that the pull request added or modified: <covered lines added or modified>/<coverable lines added or modified> * 100%

See your quality gate settings    Change summary preferences

@steven-sheehy steven-sheehy modified the milestones: 0.135.0, 0.136.0 Jul 23, 2025
@steven-sheehy steven-sheehy added the importer Area: Importer label Jul 25, 2025
@steven-sheehy steven-sheehy removed this from the 0.136.0 milestone Aug 6, 2025
@xin-hedera xin-hedera added this to the 0.147.0 milestone Jan 14, 2026
@xin-hedera xin-hedera marked this pull request as ready for review January 14, 2026 16:40
@xin-hedera xin-hedera requested a review from a team as a code owner January 14, 2026 16:40
@xin-hedera xin-hedera modified the milestones: 0.147.0, 0.148.0 Jan 22, 2026
@xin-hedera xin-hedera force-pushed the 11271-hip-1081-prioritize-block-nodes-by-latency branch from 7bbef9c to ddca603 Compare January 23, 2026 19:22
Signed-off-by: Xin Li <[email protected]>
| `hiero.mirror.importer.block.scheduler.maxPostProcessingLatency` | 1s | The maximum allowed post-processing delay to calculate and record block node streaming latency. |
| `hiero.mirror.importer.block.scheduler.minRescheduleInterval` | 10s | The mininum block node reschedule interval. |
| `hiero.mirror.importer.block.scheduler.rescheduleLatencyThreshold` | 50ms | The threshold to meet for lower latency block nodes to trigger a reschedule. |
| `hiero.mirror.importer.block.scheduler.type` | PRIORITY | The scheduler type. Can be `LATENCY`, `PRIORITY`, or `PRIORITY_THEN_LATENCY`. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we default to PRIORITY_THEN_LATENCY? Otherwise all this latency work will never be used in practice.

| `hiero.mirror.importer.block.nodes[].port` | 40840 | The port of the block node server. |
| `hiero.mirror.importer.block.nodes[].priority` | 0 | The priority of the block node server. A lower value indicates higher priority, and 0 is the highest priority. |
| `hiero.mirror.importer.block.persistBytes` | false | Whether to persist the block stream file bytes to the database. |
| `hiero.mirror.importer.block.scheduler.latencyService.backlog` | 1 | The backlog size of pending latency measuring tasks. Note a max number of backlog plus 1 tasks can be scheduled when the service is idle. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer simpler latency for property names instead of implementation names.

Suggested change
| `hiero.mirror.importer.block.scheduler.latencyService.backlog` | 1 | The backlog size of pending latency measuring tasks. Note a max number of backlog plus 1 tasks can be scheduled when the service is idle. |
| `hiero.mirror.importer.block.scheduler.latency.backlog` | 1 | The backlog size of pending latency measuring tasks. Note a max number of backlog plus 1 tasks can be scheduled when the service is idle. |

Comment on lines +10 to +18
final class Latency {

private static final int HISTORY_SIZE = 5;

@Getter
private long average = Long.MIN_VALUE;

private int count = 0;
private final long[] history = new long[HISTORY_SIZE];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class seems like overkill and may not smooth the averages as we like. Why can't we drop the class and use a simple exponential moving average in the calling class that's stored as a double or AtomicDouble?


@Data
@Validated
public final class SchedulerProperties {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer this as a separate @ConfigurationProperties instead of nested to reduce coupling. Also move to scheduler package.

case END_OF_BLOCK -> {
running = !assembler.onEndOfBlock(response.getEndOfBlock());
if (!running) {
log.info("Cancel the subscription to try rescheduling");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log sounds like a suggestion to the operator to take some action. Do you mean to say Cancelling the subscription... to indicate the code is doing the action?

Suggested change
log.info("Cancel the subscription to try rescheduling");
log.info("Cancel the subscription to try rescheduling");

protected final AtomicReference<@Nullable BlockNode> current = new AtomicReference<>();
protected final AtomicLong lastScheduledTime = new AtomicLong(0);

protected long lastPostProcessingLatency;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be private. Should also be atomic or volatile to be safe.

Comment on lines +73 to +74
@Scheduled(fixedDelayString = "#{@blockProperties.getScheduler().getLatencyService().getFrequency().toMillis()}")
public void schedule() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible this takes longer than frequency and multiple invocations occur simultaneously? If so, might need synchronized.

Comment on lines +36 to +40
latencyService.cancelAll();
current.set(super.getNode(blockNumber));
candidates.clear();
candidates.addAll(getCandidates());
latencyService.setNodes(candidates);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latencyService.cancelAll() called twice: once directly and other via setNodes.

cancelAll();

long bornGeneration = generation.incrementAndGet();
nodes.forEach(blockNode -> tasks.add(new Task(bornGeneration, blockNode)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to optimize to skip currently connected node since we can measure that directly elsewhere?

log.info("Measuring {}'s latency by streaming block {}", node, nextBlockNumber);
final var timeout =
blockProperties.getScheduler().getLatencyService().getTimeout();
node.streamBlocks(nextBlockNumber, nextBlockNumber, this::measureLatency, timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next block might not be available. Should we stream a couple previous blocks that we know are present? This would omit time waiting for next block and spread the latency across a few blocks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Type: New feature importer Area: Importer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add tests that check the latency prioritisation HIP-1081 Prioritize block nodes by latency

6 participants