Skip to content

finish new consensus#721

Open
rkuhn wants to merge 12 commits intomainfrom
rk/fetch-blocks
Open

finish new consensus#721
rkuhn wants to merge 12 commits intomainfrom
rk/fetch-blocks

Conversation

@rkuhn
Copy link
Contributor

@rkuhn rkuhn commented Mar 11, 2026

It is not completely finished yet, hence draft status.

  • add fetch_blocks, which will request ranges (with the expected speedup)
  • add validate_block2 to complete the set of stages
  • wire everything up

current issues:

  • will die upon restart due to improper ledger handling
  • doesn’t validate block hashes
  • doesn’t keep best chain tagged, meaning that downstream propagation is broken

Summary by CodeRabbit

  • New Features

    • New consensus pipeline stages: chain adoption, block fetching, and an improved block validation stage; versioned block-fetch responses and manager/connection support.
  • Improvements

    • More robust ledger rollback/alignment, stronger tip/anchor management, refined chain selection and fetch sequencing, and increased debug tracing for diagnostics.
  • Tests

    • Major expansion of trace-driven tests and test utilities for consensus and network stages.
  • Chores

    • Stricter linting for futures and several dependency/re-export updates.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR restructures consensus: adds three new stages (adopt_chain, fetch_blocks, validate_block2); moves block loading into ledger/validate effects and removes block payloads from stage APIs; extends ledger trait with sync helpers (contains_point, tip, volatile_tip); introduces BlockFetch v2 (Blocks2/RequestRange2); broad test harness and trace changes; many wiring and instrumentation updates.

Changes

Cohort / File(s) Summary
Workspace & Manifests
crates/amaru-consensus/Cargo.toml, crates/amaru-ouroboros-traits/Cargo.toml
Enable workspace async-trait flag and add tracing dependency.
Ledger trait & effects
crates/amaru-ouroboros-traits/src/can_validate_blocks/..., crates/amaru-ledger/src/block_validator.rs, crates/amaru-ledger/src/state*.rs, crates/amaru-ledger/src/state/volatile_db.rs, crates/amaru-consensus/src/effects/ledger_effects.rs, crates/amaru-consensus/src/effects/consensus_effects.rs, crates/amaru-consensus/src/effects/mod.rs
Removed block param from validate_block; added sync methods contains_point, tip, volatile_tip; ValidateBlockEffect now loads/decodes blocks by point; new sync effects (ContainsPointEffect, TipEffect, VolatileTipEffect); mocks and implementations updated.
New consensus stages & tests
crates/amaru-consensus/src/stages/adopt_chain/..., .../fetch_blocks/..., .../validate_block2/..., crates/amaru-consensus/src/stages/mod.rs
Add adopt_chain (adoption, fork switching, drag-anchor), fetch_blocks (backfill, timeouts, manager integration), validate_block2 (fork-aware ledger alignment + per-block validation). Extensive test helpers and tests added.
Stage graph & node wiring
crates/amaru/src/stages/build_stage_graph.rs, crates/amaru/src/stages/build_node.rs
Rewire pipeline to new stages; build_stage_graph signature changed (ledger_tip + optional candidate); added runtime termination watcher.
Select-chain & related tests
crates/amaru-consensus/src/stages/select_chain_new/*, .../test_setup.rs, .../tests.rs
best_tip now Option, added may_fetch_blocks gate and FetchNextFrom message; cmp_tip now compares Option<&BlockHeader>; tests updated.
Blockfetch v2 & protocols
crates/amaru-protocols/src/blockfetch/*, crates/amaru-protocols/src/connection.rs, crates/amaru-protocols/src/manager.rs
Introduce Blocks2 enum, RequestRange2/Resp API, queue now carries Resp; manager/connection/initiator handle FetchBlocks2 and id-tagged replies; instrumentation tweaks.
Trace, replay & simulation helpers
crates/pure-stage/src/trace_buffer.rs, crates/pure-stage/src/simulation/replay.rs, crates/pure-stage/src/stage_ref.rs, crates/pure-stage/src/simulation/running/mod.rs, crates/amaru/src/tests/*, simulation/amaru-sim/*
TraceBuffer preserves invalid CBOR as InvalidBytes; replay fails with explicit InvalidBytes error; StageRef::is_blackhole and SimulationRunning::has_effects added; some simulator tests ignored and defaults reduced.
Kernel types & helpers
crates/amaru-kernel/src/cardano/*, crates/amaru-kernel/src/lib.rs
Add Block::body_hash/tip, BlockHeader vrf_leader/op_cert_seq accessors, NetworkBlock len/as_slice, make_header_with_op_cert_seq helper, BlockHeight - u64 and Slot - Slot subtraction.
Volatile DB & anchoring
crates/amaru-ledger/src/state.rs, crates/amaru-ledger/src/state/volatile_db.rs
Volatile anchor now carries Tip; added VolatileDB::contains, contains_volatile_point, volatile_tip; stricter rollback prechecks and new BackwardError variants.
Track peers & defer substage
crates/amaru-consensus/src/stages/track_peers/*
Add defer_req_next sub-stage to register/poll deferred RequestNext dispatches based on ledger-applied height; track_peers uses Tip/VolatileTip effects; tests adjusted.
Tests & test helpers (many)
multiple .../test_setup.rs, .../tests.rs across consensus/protocol/ledger
Numerous new and updated test suites and harness helpers to exercise new stages, effects, and BlockFetch v2 behavior; mocks updated to implement new sync methods.
Instrumentation / logging
various crates/amaru-protocols/src/**, crates/amaru/src/bin/amaru/cmd/run.rs
Many tracing spans set to level="debug"; added watcher task to cancel global exit on unexpected consensus termination.

Sequence Diagram(s)

sequenceDiagram
    participant SC as SelectChain
    participant FB as FetchBlocks
    participant VB as ValidateBlock2
    participant AC as AdoptChain
    participant Ledger as LedgerOps
    participant Store as ChainStore
    participant Manager as Manager

    SC->>FB: (Tip, parent) when may_fetch_blocks
    activate FB
    FB->>Store: load headers, determine missing points
    FB->>Manager: FetchBlocks2 { from, through, id }
    Manager->>Manager: forward to peers

    alt Blocks received
        FB->>FB: accept & store blocks
        FB->>VB: forward (Tip, parent)
    else Timeout
        FB->>SC: FetchNextFrom(point)
    end
    deactivate FB

    activate VB
    VB->>Ledger: contains_point(point)?
    alt not contained
        VB->>Ledger: rollback_to(ancestor)
    end
    loop forward points
        VB->>Ledger: validate_block(&peer, &point, ctx)
        Ledger-->>VB: LedgerMetrics | BlockValidationError
    end
    VB->>SC: BlockValidationResult(tip, ok)
    deactivate VB

    activate AC
    AC->>Store: load incoming & current headers
    AC->>AC: cmp_tip
    alt incoming better
        AC->>Ledger: roll_forward / rollback+roll_forward
        AC->>Store: set_best_chain_hash, drag_anchor_forward
        AC->>Manager: NewTip(msg)
    end
    deactivate AC
Loading
sequenceDiagram
    participant Stage as ValidateBlockEffect
    participant Effect as Effects
    participant Store as ChainStore
    participant Ledger as LedgerOps

    Stage->>Effect: create ValidateBlockEffect(point)
    Effect->>Store: load block bytes by point.hash()
    alt block found & decoded
        Effect->>Ledger: validate_block(peer, &point, ctx)
        Ledger-->>Effect: Result<LedgerMetrics, BlockValidationError>
    else missing/ decode fail
        Effect-->>Stage: Err(BlockValidationError)
    end

    Stage->>Effect: contains_point(point)?
    Effect->>Ledger: contains_point(point)
    Ledger-->>Effect: bool

    Stage->>Effect: tip()?
    Effect->>Ledger: tip()
    Ledger-->>Effect: Point / Tip
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested reviewers

  • etorreborre
  • abailly

Poem

New stages hum like synth in dawn-lit lanes,
fetch_blocks hustles, validate_block2 trains.
Ledger asks by point — no bundle in hand,
traces keep odd bytes, the logs still stand.
Merge on, friend — CI queues the final bands.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch rk/fetch-blocks

@rkuhn rkuhn force-pushed the rk/fetch-blocks branch 4 times, most recently from aae7cde to d0fa686 Compare March 16, 2026 07:05
rkuhn added 2 commits March 19, 2026 15:12
Signed-off-by: Roland Kuhn <rk@rkuhn.info>
Signed-off-by: Roland Kuhn <rk@rkuhn.info>
@rkuhn rkuhn force-pushed the rk/fetch-blocks branch from d0fa686 to 7fd3bb0 Compare March 19, 2026 14:14
@rkuhn rkuhn marked this pull request as ready for review March 19, 2026 15:00
rkuhn added 4 commits March 19, 2026 16:01
- fix startup bugs and amend tests for track_peers and select_chain_new
- implement proper chain selection criterion
- fix termination caused via pure-stage
- clean up logging
- fix updating of best_chain_hash

Signed-off-by: Roland Kuhn <rk@rkuhn.info>
also complain when the exact rollback point is not found, instead of
silently moving to a different point's state

also complain when trying to roll back to a point before the tip

Signed-off-by: Roland Kuhn <rk@rkuhn.info>
Signed-off-by: Roland Kuhn <rk@rkuhn.info>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Nitpick comments (8)
crates/amaru-kernel/src/cardano/block_height.rs (1)

51-57: Please add direct tests for new Sub<u64> semantics.

Line 55’s saturating behavior is a good call (no underflow drama), but there’s no explicit test locking this in. A tiny test pair would prevent future “whoops” regressions.

💡 Suggested test addition
 #[cfg(any(test, feature = "test-utils"))]
 mod tests {
     use proptest::prop_compose;

     use super::*;
     use crate::prop_cbor_roundtrip;

     prop_cbor_roundtrip!(BlockHeight, any_block_height());

+    #[test]
+    fn block_height_subtracts_normally_when_rhs_is_smaller() {
+        assert_eq!((BlockHeight::new(10) - 3).as_u64(), 7);
+    }
+
+    #[test]
+    fn block_height_sub_saturates_at_zero_on_underflow() {
+        assert_eq!((BlockHeight::new(2) - 5).as_u64(), 0);
+    }
+
     prop_compose! {
         pub fn any_block_height()(h in 1..1000u64) -> BlockHeight {
             BlockHeight::from(h)
         }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-kernel/src/cardano/block_height.rs` around lines 51 - 57, Add
unit tests for the Sub<u64> implementation on BlockHeight to lock in the
saturating behavior: add one test calling BlockHeight(x) - y where y < x and
assert the result equals BlockHeight(x - y), and another test where y > x and
assert the result equals BlockHeight(0) to ensure saturating_sub is preserved;
place tests near existing block_height tests and reference the BlockHeight type
and the impl Sub<u64>::sub (which uses saturating_sub) so future changes to that
method will be caught.
crates/amaru-ouroboros-traits/src/connections/connection_provider.rs (1)

53-57: Tiny doc polish: clarify pre-increment return semantics.

Line 53 reads like it returns the next ID, but Lines 55–57 return the current ID first, then increment. A wording tweak would save future “wait, what?” moments.

Suggested doc tweak
-    /// Get the next ConnectionId, wrapping on overflow (which should not happen given we are using u64)
+    /// Return the current ConnectionId, then increment it (wrapping on overflow).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-ouroboros-traits/src/connections/connection_provider.rs` around
lines 53 - 57, The doc comment for get_and_increment is misleading — it implies
returning the “next” ConnectionId but the method actually returns the current ID
and then increments (post-increment semantics). Update the documentation for pub
fn get_and_increment(&mut self) -> Self to clearly state it returns the current
ConnectionId (self) and then advances the internal counter using
wrapping_add(1), so readers aren’t confused about its post-increment behavior.
crates/amaru-kernel/src/cardano/network_block.rs (1)

34-53: Add is_empty() and remove the redundant encoded_block() accessor.

The struct has two identical accessor methods returning &self.encoded_blockas_slice() is the canonical one (used 119 times vs 1 for encoded_block()). Add is_empty() to satisfy the clippy lint, keep as_slice(), and remove encoded_block() to avoid future maintenance drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-kernel/src/cardano/network_block.rs` around lines 34 - 53, The
NetworkBlock struct currently triggers clippy's len_without_is_empty because it
defines len() but not is_empty(), and it also exposes a redundant
encoded_block() accessor (duplicate of as_slice()). Fix by adding a public
is_empty(&self) -> bool that returns self.encoded_block.is_empty(), remove the
encoded_block(&self) -> &[u8] method, and keep as_slice(&self) -> &[u8] as the
sole slice accessor; ensure callers use as_slice() (it’s already used widely),
leaving len() and the new is_empty() in NetworkBlock to satisfy the lint.
simulation/amaru-sim/tests/replay_test.rs (1)

26-27: Test ignored with a FIXME—fair enough for now, but needs a plan.

Marking the test as #[ignore] makes sense if the data structures changed and the stored traces are now incompatible. The FIXME raises a legit question though: how do you regenerate the stored traces when schemas evolve?

A few options to consider down the track:

  1. A CI job or script that regenerates traces when structures change
  2. Version-tagging traces and skipping incompatible ones gracefully
  3. Documentation on the manual regeneration process

For now, ignoring is fine for a draft PR, but this feels like tech debt that'll bite if left too long—like skipping a side quest that turns out to have crucial late-game loot.

Want me to open an issue to track creating a trace regeneration workflow?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@simulation/amaru-sim/tests/replay_test.rs` around lines 26 - 27, The test
function test_run_replay is annotated with #[ignore] and a FIXME without a
tracked plan; create a concrete remediation: add a task/issue reference and a
brief regeneration strategy comment above test_run_replay, and optionally
replace the bare FIXME with a TODO pointing to that issue ID, plus a short note
about preferred approach (CI regeneration script, versioned traces, or manual
steps) so future authors know how to regenerate stored traces when schemas
change; reference the test function name test_run_replay and the #[ignore]
attribute so maintainers can find and update this test.
crates/amaru-protocols/src/chainsync/responder.rs (1)

183-212: Tip-walk overlap search looks solid, but watch the slot comparison edge case.

The algorithm walks back from both the tip and pointer chains, using slot comparison to find overlap. One thing to keep an eye on: if two different forks have headers at the exact same slot, slot_or_default() comparison might skip past a valid overlap point. In practice this is probably rare, but worth a quick think—are there scenarios where two competing chain tips could have blocks at identical slots?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-protocols/src/chainsync/responder.rs` around lines 183 - 212, In
next_header_from_tip, the current slot-only comparison can miss overlaps when
two different headers share the same slot; update the loop so you first check
for exact point equality (the existing if *tip_point == point) and otherwise
advance the tip_chain when tip_point.slot_or_default() > point.slot_or_default()
OR when slots are equal but hashes differ (i.e., same slot but tip_point.hash()
!= point.hash()); refer to symbols tip_chain, tip_point, pointer,
next_header_from_tip and ResponderAction::RollBackward to locate where to add
the extra case so equal-slot/different-hash headers are skipped rather than
accidentally treating them as potential overlaps.
crates/amaru-ouroboros-traits/src/can_validate_blocks/mock.rs (1)

41-43: Mock semantics are a bit too permissive for chain-point checks.

contains_point always true while tip is always Origin can make rewind/containment paths look valid when they aren’t. Consider a minimal stateful mock (track current tip and answer contains_point accordingly) so tests catch real regressions.

Also applies to: 45-47

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-ouroboros-traits/src/can_validate_blocks/mock.rs` around lines
41 - 43, The mock implementation is too permissive because contains_point always
returns true while tip is always Origin; update the mock in mock.rs to maintain
minimal state: store a current tip (e.g., a field like current_tip) and update
it when tip() or other advancing methods are called, and implement
contains_point(&self, point: &Point) to return true only if the given point is
equal to or is an ancestor of current_tip (or otherwise consistent with your
chain semantics); adjust any related methods (e.g., tip()) to read from this
state so rewind/containment tests behave realistically.
crates/amaru-ledger/src/block_validator.rs (1)

95-98: Let tip() reuse get_tip().

Same mutex shuffle lives in two places now. Tiny thing, but keeping one source of truth avoids drift later, mate.

♻️ Tiny cleanup
     #[expect(clippy::unwrap_used)]
     fn tip(&self) -> Point {
-        let state = self.state.lock().unwrap();
-        state.tip().into_owned()
+        self.get_tip()
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-ledger/src/block_validator.rs` around lines 95 - 98, The tip()
method duplicates the mutex locking logic already implemented by get_tip();
change tip() to simply call and return the result of get_tip() (e.g., replace
the body that does self.state.lock().unwrap() / state.tip().into_owned() with a
call to self.get_tip()), so the single lock/ownership logic lives in get_tip()
and avoids duplicated mutex handling between tip() and get_tip().
crates/amaru-consensus/src/effects/ledger_effects.rs (1)

129-129: Unused _peer variable after refactor.

Looks like peer got left behind like a forgotten NPC after a cutscene, mate. The underscore prefix suppresses the warning, but if it's no longer needed in ValidateBlockEffect::run, might be cleaner to remove it from the struct entirely—or is there a plan to use it for logging/metrics down the track?

🧹 Optional cleanup

If peer is no longer needed:

 #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
 pub struct ValidateBlockEffect {
-    peer: Peer,
     point: Point,
     #[serde(skip)]
     ctx: IgnoreEq<opentelemetry::Context>,
 }

 impl ValidateBlockEffect {
-    pub fn new(peer: &Peer, point: &Point, ctx: opentelemetry::Context) -> Self {
-        Self { peer: peer.clone(), point: *point, ctx: ctx.into() }
+    pub fn new(point: &Point, ctx: opentelemetry::Context) -> Self {
+        Self { point: *point, ctx: ctx.into() }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/effects/ledger_effects.rs` at line 129, The
destructuring in ValidateBlockEffect::run binds a peer field as _peer but never
uses it; either remove peer from the ValidateBlockEffect struct (and update all
its constructors/usages) or if the field is intended to be kept for future use,
change the pattern to omit it entirely (e.g., destructure only point and ctx) or
reference it where needed (e.g., add logging/metrics using peer). Locate the
Self { peer: _peer, point, ctx } = *self; binding in ValidateBlockEffect::run
and either delete the peer field from the struct definition and all places that
construct or match it, or drop peer from the destructure here (or use it
meaningfully) to eliminate the unused-variable noise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/amaru-consensus/src/effects/consensus_effects.rs`:
- Around line 188-193: The mock ledger is incoherent because contains_point()
unconditionally returns true while tip() always returns Point::Origin; update
the test double to track ledger state (e.g., add a stored latest_point or a
HashSet of known Points) and implement contains_point(&self, point: &Point) to
check membership against that stored state and implement tip(&self) to return
the stored latest_point instead of Point::Origin so the mock behaves like a real
ledger; adjust any test setup to initialize/update that stored state when
simulating appends so functions named contains_point and tip reflect consistent
state.

In `@crates/amaru-consensus/src/stages/fetch_blocks/mod.rs`:
- Around line 88-90: The ancestor scan currently swallows store.load_block()
errors by mapping Err(_) to None; change the logic to fail fast like the
load_header() path: replace the ok().and_then(|x| x).is_some() check with a
match on store.load_block(&header.hash()) that breaks when Ok(Some(_)),
continues on Ok(None), and returns/propagates the Err(e) immediately when Err(e)
is returned so local store failures are surfaced instead of treated as "missing"
blocks.
- Around line 97-102: Before performing the early return in the missing.is_empty
branch, clear/invalidate any armed fetch request state so the pending timer
cannot later replay a stale FetchNextFrom(self.from); specifically, reset the
stored req_id (set it to None or equivalent) and cancel or take ownership of the
timeout handle used by timeout() so it won’t fire, then proceed with the
existing send(...) calls and return. Target the state variables/logic around
req_id, the timeout/timeout handle, and the branch that sends to
self.downstream/self.upstream to ensure the old request is fully cleared before
returning.

In `@crates/amaru-consensus/src/stages/validate_block2/mod.rs`:
- Around line 147-157: The check uses ledger_tip as the immutable cutoff causing
valid common ancestors inside the volatile DB to be rejected; reorder and/or
change the comparisons so you test volatile membership first (use
ledger.contains_point(&ancestor.point()) and equality with ledger_tip to accept
an ancestor as rb_point) and only treat points below the volatile DB boundary as
immutable and error. Concretely, in the block that inspects ancestor.point(),
set rb_point when ledger.contains_point(&ancestor.point()) || ancestor.point()
== ledger_tip, and move the immutable-db error (ValidationFailed::new with
ConsensusError::RollbackBlockFailed and parent) to run only when
ancestor.point() is strictly below the actual volatile-db boundary (not just <
ledger_tip).
- Around line 53-58: The genesis fast-path in stage (fn stage with
ValidateBlockMsg) marks the tip valid and sets state.current without advancing
the ledger, causing subsequent validations to use stale ledger state; update the
branch where msg.parent == Point::Origin to roll the ledger forward with msg.tip
(e.g., call the ledger roll_forward/apply_block method on state.ledger or
equivalent) before sending SelectChainMsg::BlockValidationResult and setting
state.current, and ensure you await the ledger update and handle/propagate any
error return from that call so the ledger and state.current stay synchronized.

In `@crates/amaru-consensus/src/stages/validate_block2/test_setup.rs`:
- Around line 162-173: The mock roll_forward_block currently returns an outer
Err when a point is in validate_fails, which triggers the hard-failure path;
change the return to Ok(Err(BlockValidationError::new(...))) so the inner Err is
returned instead and tests exercise the "invalid block but continue" path—edit
the async fn roll_forward_block in test_setup.rs to return Ok(Err(...)) when
inner.validate_fails.contains(point) (keep the same BlockValidationError
construction) and leave the success branch as Ok(Ok(Default::default())).

In `@crates/amaru-ledger/src/state.rs`:
- Around line 634-636: contains_point currently only checks the volatile deque
(self.volatile.contains(point)) and therefore misses persisted points after
restart; update contains_point to consult the stable store as well (e.g., check
both the volatile deque and the stable storage accessor used elsewhere in this
module) and return true if either contains the point so calls like
contains_point(self.tip()) correctly succeed for persisted tips and older stable
points.
- Around line 617-629: The first check uses self.tip() to decide a "before tip"
rollback but when volatile entries exist the correct lower bound is the first
rollbackable point (the stable tip just below the volatile window), not
self.tip(); change the condition so it compares to that first rollbackable point
(obtainable from the volatile window’s earliest anchor or the stable lower
bound) and keep self.tip() only for the "in the future" check; update the branch
that currently returns BackwardError::RollbackPointBeforeTip to use that
computed rollbackable_lower instead of self.tip(), and leave the subsequent
volatile.view_back() / volatile.rollback_to() logic unchanged.

In `@crates/amaru-ledger/src/state/volatile_db.rs`:
- Around line 133-148: The loop can exit having consumed the entire sequence
without ever hitting diff.anchor.0 == *point, which currently returns Ok; after
the loop and before truncating/committing, verify we actually matched the
rollback point by checking that ix > 0 and self.sequence[ix - 1].anchor.0 ==
*point (or otherwise call on_unknown_point(point) and return Err). Update the
code around self.sequence, ix, and on_unknown_point to perform this exact-match
check and only proceed to truncate/assign self.cache when the match is
confirmed.

In `@crates/amaru-protocols/src/blockfetch/initiator.rs`:
- Around line 313-314: The V2 Done branch (Resp::V2(id, cr) => eff.send(&cr,
Blocks2::Done(id)).await) currently skips the final range validation; update the
Resp::V2 handler in initiator.rs to run the same is_valid_block_range check used
by V1 against the received container/response (cr) before sending
Blocks2::Done(id), and if validation fails return/send the same failure/invalid
response path (or an explicit error) as the V1 branch to preserve safety; locate
Resp::V2, Blocks2::Done, and is_valid_block_range in this module to implement
the check and mirror the V1 behavior.
- Around line 269-282: The V2 branch currently decodes and forwards blocks
without validating they match the requested range; before calling eff.send(cr,
Blocks2::Block(*id, network_block)) in the Resp::V2 handling path (where you
call network_block.decode_block()), perform the same incremental validation you
use for V1 (e.g., reuse or call is_valid_block_range or equivalent validation
logic) to check block height, parent hash linkage and content hash against the
requested id/range, and if validation fails log + terminate/ignore the
connection (as you do in the V1 error path) instead of forwarding; ensure the
check runs after decode_block() and before eff.send so malicious or out-of-range
blocks are never injected into Blocks2::Block.

In `@crates/amaru-protocols/src/manager.rs`:
- Around line 308-320: The loop sending ManagerMessage::FetchBlocks2 currently
iterates all manager.peers and can target responder or non-initiator entries;
change it to first collect only connected initiator connections (filter
manager.peers for ConnectionState::Connected(_conn_id, connection) where the
connection is an initiator—e.g. connection.is_initiator() or equivalent), and if
that filtered set is empty call eff.send(&cr, Blocks2::NoBlocks(id)).await and
return; otherwise iterate that filtered collection and eff.send(connection,
ConnectionMessage::FetchBlocks2 { from, through, cr: cr.clone(), id }).await for
each. Ensure you reference the same symbols: ManagerMessage::FetchBlocks2,
manager.peers, ConnectionState::Connected, connection,
ConnectionMessage::FetchBlocks2, Blocks2::NoBlocks(id), and cr.

---

Nitpick comments:
In `@crates/amaru-consensus/src/effects/ledger_effects.rs`:
- Line 129: The destructuring in ValidateBlockEffect::run binds a peer field as
_peer but never uses it; either remove peer from the ValidateBlockEffect struct
(and update all its constructors/usages) or if the field is intended to be kept
for future use, change the pattern to omit it entirely (e.g., destructure only
point and ctx) or reference it where needed (e.g., add logging/metrics using
peer). Locate the Self { peer: _peer, point, ctx } = *self; binding in
ValidateBlockEffect::run and either delete the peer field from the struct
definition and all places that construct or match it, or drop peer from the
destructure here (or use it meaningfully) to eliminate the unused-variable
noise.

In `@crates/amaru-kernel/src/cardano/block_height.rs`:
- Around line 51-57: Add unit tests for the Sub<u64> implementation on
BlockHeight to lock in the saturating behavior: add one test calling
BlockHeight(x) - y where y < x and assert the result equals BlockHeight(x - y),
and another test where y > x and assert the result equals BlockHeight(0) to
ensure saturating_sub is preserved; place tests near existing block_height tests
and reference the BlockHeight type and the impl Sub<u64>::sub (which uses
saturating_sub) so future changes to that method will be caught.

In `@crates/amaru-kernel/src/cardano/network_block.rs`:
- Around line 34-53: The NetworkBlock struct currently triggers clippy's
len_without_is_empty because it defines len() but not is_empty(), and it also
exposes a redundant encoded_block() accessor (duplicate of as_slice()). Fix by
adding a public is_empty(&self) -> bool that returns
self.encoded_block.is_empty(), remove the encoded_block(&self) -> &[u8] method,
and keep as_slice(&self) -> &[u8] as the sole slice accessor; ensure callers use
as_slice() (it’s already used widely), leaving len() and the new is_empty() in
NetworkBlock to satisfy the lint.

In `@crates/amaru-ledger/src/block_validator.rs`:
- Around line 95-98: The tip() method duplicates the mutex locking logic already
implemented by get_tip(); change tip() to simply call and return the result of
get_tip() (e.g., replace the body that does self.state.lock().unwrap() /
state.tip().into_owned() with a call to self.get_tip()), so the single
lock/ownership logic lives in get_tip() and avoids duplicated mutex handling
between tip() and get_tip().

In `@crates/amaru-ouroboros-traits/src/can_validate_blocks/mock.rs`:
- Around line 41-43: The mock implementation is too permissive because
contains_point always returns true while tip is always Origin; update the mock
in mock.rs to maintain minimal state: store a current tip (e.g., a field like
current_tip) and update it when tip() or other advancing methods are called, and
implement contains_point(&self, point: &Point) to return true only if the given
point is equal to or is an ancestor of current_tip (or otherwise consistent with
your chain semantics); adjust any related methods (e.g., tip()) to read from
this state so rewind/containment tests behave realistically.

In `@crates/amaru-ouroboros-traits/src/connections/connection_provider.rs`:
- Around line 53-57: The doc comment for get_and_increment is misleading — it
implies returning the “next” ConnectionId but the method actually returns the
current ID and then increments (post-increment semantics). Update the
documentation for pub fn get_and_increment(&mut self) -> Self to clearly state
it returns the current ConnectionId (self) and then advances the internal
counter using wrapping_add(1), so readers aren’t confused about its
post-increment behavior.

In `@crates/amaru-protocols/src/chainsync/responder.rs`:
- Around line 183-212: In next_header_from_tip, the current slot-only comparison
can miss overlaps when two different headers share the same slot; update the
loop so you first check for exact point equality (the existing if *tip_point ==
point) and otherwise advance the tip_chain when tip_point.slot_or_default() >
point.slot_or_default() OR when slots are equal but hashes differ (i.e., same
slot but tip_point.hash() != point.hash()); refer to symbols tip_chain,
tip_point, pointer, next_header_from_tip and ResponderAction::RollBackward to
locate where to add the extra case so equal-slot/different-hash headers are
skipped rather than accidentally treating them as potential overlaps.

In `@simulation/amaru-sim/tests/replay_test.rs`:
- Around line 26-27: The test function test_run_replay is annotated with
#[ignore] and a FIXME without a tracked plan; create a concrete remediation: add
a task/issue reference and a brief regeneration strategy comment above
test_run_replay, and optionally replace the bare FIXME with a TODO pointing to
that issue ID, plus a short note about preferred approach (CI regeneration
script, versioned traces, or manual steps) so future authors know how to
regenerate stored traces when schemas change; reference the test function name
test_run_replay and the #[ignore] attribute so maintainers can find and update
this test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d26680fc-8bb0-4b5a-bf21-92907e7dd3c9

📥 Commits

Reviewing files that changed from the base of the PR and between 8d7f5a5 and 7fd3bb0.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (59)
  • crates/amaru-consensus/Cargo.toml
  • crates/amaru-consensus/src/effects/consensus_effects.rs
  • crates/amaru-consensus/src/effects/ledger_effects.rs
  • crates/amaru-consensus/src/effects/metrics_effects.rs
  • crates/amaru-consensus/src/effects/mod.rs
  • crates/amaru-consensus/src/lib.rs
  • crates/amaru-consensus/src/stages/adopt_chain/mod.rs
  • crates/amaru-consensus/src/stages/adopt_chain/test_setup.rs
  • crates/amaru-consensus/src/stages/adopt_chain/tests.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/mod.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/test_setup.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/tests.rs
  • crates/amaru-consensus/src/stages/mod.rs
  • crates/amaru-consensus/src/stages/select_chain_new/mod.rs
  • crates/amaru-consensus/src/stages/select_chain_new/test_setup.rs
  • crates/amaru-consensus/src/stages/select_chain_new/tests.rs
  • crates/amaru-consensus/src/stages/track_peers/mod.rs
  • crates/amaru-consensus/src/stages/validate_block.rs
  • crates/amaru-consensus/src/stages/validate_block2/mod.rs
  • crates/amaru-consensus/src/stages/validate_block2/test_setup.rs
  • crates/amaru-consensus/src/stages/validate_block2/tests.rs
  • crates/amaru-kernel/src/cardano/block.rs
  • crates/amaru-kernel/src/cardano/block_header.rs
  • crates/amaru-kernel/src/cardano/block_header/tests.rs
  • crates/amaru-kernel/src/cardano/block_height.rs
  • crates/amaru-kernel/src/cardano/network_block.rs
  • crates/amaru-kernel/src/cardano/slot.rs
  • crates/amaru-kernel/src/lib.rs
  • crates/amaru-ledger/src/block_validator.rs
  • crates/amaru-ledger/src/state.rs
  • crates/amaru-ledger/src/state/volatile_db.rs
  • crates/amaru-network/src/connection.rs
  • crates/amaru-ouroboros-traits/Cargo.toml
  • crates/amaru-ouroboros-traits/src/can_validate_blocks/mock.rs
  • crates/amaru-ouroboros-traits/src/can_validate_blocks/mod.rs
  • crates/amaru-ouroboros-traits/src/connections/connection_provider.rs
  • crates/amaru-ouroboros-traits/src/stores/consensus/mod.rs
  • crates/amaru-protocols/src/blockfetch/initiator.rs
  • crates/amaru-protocols/src/blockfetch/mod.rs
  • crates/amaru-protocols/src/blockfetch/responder.rs
  • crates/amaru-protocols/src/chainsync/initiator.rs
  • crates/amaru-protocols/src/chainsync/responder.rs
  • crates/amaru-protocols/src/connection.rs
  • crates/amaru-protocols/src/handshake/initiator.rs
  • crates/amaru-protocols/src/handshake/responder.rs
  • crates/amaru-protocols/src/keepalive/initiator.rs
  • crates/amaru-protocols/src/keepalive/responder.rs
  • crates/amaru-protocols/src/manager.rs
  • crates/amaru-protocols/src/tests/slow_manager_stage.rs
  • crates/amaru-protocols/src/tx_submission/initiator.rs
  • crates/amaru-protocols/src/tx_submission/responder.rs
  • crates/amaru/src/bin/amaru/cmd/run.rs
  • crates/amaru/src/stages/build_node.rs
  • crates/amaru/src/stages/build_stage_graph.rs
  • crates/amaru/src/tests/in_memory_connection_provider.rs
  • crates/pure-stage/src/simulation/replay.rs
  • crates/pure-stage/src/stage_ref.rs
  • crates/pure-stage/src/trace_buffer.rs
  • simulation/amaru-sim/tests/replay_test.rs

Comment on lines +269 to +282
if let Some((_, _, Resp::V2(id, cr))) = self.queue.front() {
// must send NetworkBlock to the local stage for storage, otherwise validation breaks
let _block = network_block
.decode_block()
.or_terminate(eff, async |error| {
tracing::warn!(
bytes = body.len(),
%error,
"received invalid block CBOR; terminating the connection"
);
})
.await;
// TODO check hashes etc.
eff.send(cr, Blocks2::Block(*id, network_block)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

V2 block handling needs validation before merge.

The V2 path decodes the block and sends it immediately, but the TODO check hashes etc. at line 281 is a bit of a red flag, mate. Unlike V1 which validates the entire range with is_valid_block_range, V2 currently trusts whatever comes through. This could allow a malicious peer to slip in blocks that don't match the requested range.

Is there a plan to add incremental validation for V2 before this ships? Without it, a peer could send blocks from the wrong chain segment. Like accepting a quest item without checking it's actually the quest item—could end up with a fake Excalibur!

Would you like me to sketch out an incremental validation approach for V2, or is this intentionally deferred to a follow-up PR?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-protocols/src/blockfetch/initiator.rs` around lines 269 - 282,
The V2 branch currently decodes and forwards blocks without validating they
match the requested range; before calling eff.send(cr, Blocks2::Block(*id,
network_block)) in the Resp::V2 handling path (where you call
network_block.decode_block()), perform the same incremental validation you use
for V1 (e.g., reuse or call is_valid_block_range or equivalent validation logic)
to check block height, parent hash linkage and content hash against the
requested id/range, and if validation fails log + terminate/ignore the
connection (as you do in the V1 error path) instead of forwarding; ensure the
check runs after decode_block() and before eff.send so malicious or out-of-range
blocks are never injected into Blocks2::Block.

Comment on lines +313 to +314
// TODO: implement validation for V2
Resp::V2(id, cr) => eff.send(&cr, Blocks2::Done(id)).await,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

V2 Done handler also skips validation.

Same concern here—the V2 Done path just sends Blocks2::Done(id) without any final validation that the received blocks formed a valid range. The V1 path does is_valid_block_range check before responding. Since this is a draft PR, I'm guessing validation is on the TODO list?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-protocols/src/blockfetch/initiator.rs` around lines 313 - 314,
The V2 Done branch (Resp::V2(id, cr) => eff.send(&cr, Blocks2::Done(id)).await)
currently skips the final range validation; update the Resp::V2 handler in
initiator.rs to run the same is_valid_block_range check used by V1 against the
received container/response (cr) before sending Blocks2::Done(id), and if
validation fails return/send the same failure/invalid response path (or an
explicit error) as the V1 branch to preserve safety; locate Resp::V2,
Blocks2::Done, and is_valid_block_range in this module to implement the check
and mirror the V1 behavior.

Comment on lines +308 to +320
ManagerMessage::FetchBlocks2 { from, through, cr, id } => {
if manager.peers.is_empty() {
tracing::warn!("no peers to fetch blocks");
eff.send(&cr, Blocks2::NoBlocks(id)).await;
return manager;
}
tracing::debug!(?from, ?through, "fetching blocks");
for state in manager.peers.values() {
let ConnectionState::Connected(_conn_id, connection) = state else {
continue;
};
eff.send(connection, ConnectionMessage::FetchBlocks2 { from, through, cr: cr.clone(), id }).await;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use only connected initiators for FetchBlocks2, mate.

manager.peers also holds accepted responder connections plus scheduled/disconnecting entries. This loop can therefore send FetchBlocks2 into a responder-side connection::stage that has no handler, and when there are zero connected initiators it just returns without ever sending Blocks2::NoBlocks(id) back to cr, leaving the caller hanging. Until the full-duplex story lands, this needs to filter to connected initiators and treat an empty filtered set as the no-blocks case.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-protocols/src/manager.rs` around lines 308 - 320, The loop
sending ManagerMessage::FetchBlocks2 currently iterates all manager.peers and
can target responder or non-initiator entries; change it to first collect only
connected initiator connections (filter manager.peers for
ConnectionState::Connected(_conn_id, connection) where the connection is an
initiator—e.g. connection.is_initiator() or equivalent), and if that filtered
set is empty call eff.send(&cr, Blocks2::NoBlocks(id)).await and return;
otherwise iterate that filtered collection and eff.send(connection,
ConnectionMessage::FetchBlocks2 { from, through, cr: cr.clone(), id }).await for
each. Ensure you reference the same symbols: ManagerMessage::FetchBlocks2,
manager.peers, ConnectionState::Connected, connection,
ConnectionMessage::FetchBlocks2, Blocks2::NoBlocks(id), and cr.

also run simulation as long as effects are pending

Signed-off-by: Roland Kuhn <rk@rkuhn.info>
@rkuhn rkuhn force-pushed the rk/fetch-blocks branch from 7fd3bb0 to f0b8aba Compare March 19, 2026 16:09
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (8)
crates/amaru-protocols/src/blockfetch/initiator.rs (1)

269-282: ⚠️ Potential issue | 🟠 Major

V2 still forwards an unvalidated range, mate.

The Block arm only checks that the CBOR decodes and then emits Blocks2::Block straight away, while the Done arm still sends Blocks2::Done(id) with the validation TODO intact. A peer can therefore complete an id with blocks that don’t match (from, through) or the parent/hash chain — very fake Excalibur energy.

Also applies to: 313-315

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-protocols/src/blockfetch/initiator.rs` around lines 269 - 282,
The V2 path (Resp::V2 in the front-of-queue handling) forwards decoded blocks
and Done notifications without validating the block range or parent/hash chain;
update the logic in the Resp::V2 handling (the branch that calls
network_block.decode_block() and the branch that sends Blocks2::Done(id)) to
perform the same validation used for other protocols: after decode_block()
succeeds, verify the block(s) match the expected (from, through) range and that
parent/hash links are consistent for id before calling eff.send(cr,
Blocks2::Block(...)) or eff.send(cr, Blocks2::Done(id)). Consolidate the
validation into a shared helper used by both the Resp::V2 block arm and the Done
arm so both enforce range and chain integrity prior to emitting Blocks2
messages.
crates/amaru-protocols/src/manager.rs (1)

308-320: ⚠️ Potential issue | 🔴 Critical

Treat “no connected fetcher” as the no-blocks case, mate.

manager.peers also holds scheduled/disconnecting entries and accepted responders. If the map is non-empty but there isn’t an outbound fetch-capable connection, this branch falls through without replying Blocks2::NoBlocks(id), so the caller can hang forever. Right now ConnectionState::Connected drops the role, so the manager can’t filter correctly yet.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-protocols/src/manager.rs` around lines 308 - 320, When handling
ManagerMessage::FetchBlocks2, currently you only check if
manager.peers.is_empty() and then broadcast to every ConnectionState::Connected
but fail to reply NoBlocks if there are peers that aren't connected
fetch-capable; change the logic to first iterate manager.peers and send
ConnectionMessage::FetchBlocks2 for each ConnectionState::Connected (as you do
in the for loop), but also track whether you actually found any connected peers
(e.g., a boolean found_connected), and if after the loop found_connected is
false send eff.send(&cr, Blocks2::NoBlocks(id)).await; reference
ManagerMessage::FetchBlocks2, manager.peers, ConnectionState::Connected,
eff.send, and Blocks2::NoBlocks to locate the fix.
crates/amaru-ledger/src/state.rs (2)

634-636: ⚠️ Potential issue | 🟠 Major

contains_point() still forgets the stable ledger.

After a restart volatile is empty, so contains_point(self.tip()) flips to false for the persisted tip and older stable points. That sends the new validation/rollback flow down the wrong branch, so this needs to consult the stable store as well.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-ledger/src/state.rs` around lines 634 - 636, contains_point
currently only checks self.volatile, which returns false after restart; update
contains_point to consult the persistent/stable ledger as well and return true
if either store contains the Point. Locate the contains_point function and
change its logic to check both self.volatile.contains(point) and the stable
store (e.g., self.stable.contains(point) or self.stable.contains_point(point)
depending on the stable store API) and return the disjunction so persisted
tips/points are recognized.

617-629: ⚠️ Potential issue | 🔴 Critical

This guard still blocks ordinary rollbacks, mate.

Line 617 compares to against the current tip, so a normal rewind from h3 to h2 — and dropping all volatile entries back to the persisted tip — gets rejected as RollbackPointBeforeTip before the volatile window can do its job. Use the current tip only for the “future” check; the lower bound here needs to be the earliest rollbackable point instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-ledger/src/state.rs` around lines 617 - 629, The check that
rejects rollbacks uses self.tip() as the lower bound and wrongly blocks ordinary
rewinds; replace the comparison that returns
BackwardError::RollbackPointBeforeTip (currently "if self.tip().as_ref() > to")
with a check against the earliest rollbackable point instead (compute the
earliest rollbackable height from the persisted tip and the oldest volatile
anchor — e.g. inspect self.volatile for the earliest anchor/entry and fall back
to self.tip() if none), and only return RollbackPointBeforeTip when *to* is
strictly below that earliest rollbackable height; keep the existing future
checks that use self.tip() and the volatile last.anchor.0 as-is.
crates/amaru-ledger/src/state/volatile_db.rs (1)

129-148: ⚠️ Potential issue | 🟠 Major

Unknown rollback points can still sneak through here.

If every deque entry sorts < point but none equals it — same-slot/different-hash is the classic boss fight — the loop falls through, truncates to ix, and returns Ok(()). Please assert that an exact match was found before committing the rebuilt cache.

🛠️ Tighten the exact-match check
         let mut cache = VolatileCache::default();
 
         // Keep all elements with slot <= target_slot
         let mut ix = 0;
+        let mut found = false;
         for diff in self.sequence.iter() {
             if diff.anchor.0 <= *point {
                 // TODO: See NOTE on VolatileDB regarding the .clone()
                 cache.merge(diff.state.utxo.clone());
                 ix += 1;
                 if diff.anchor.0 == *point {
+                    found = true;
                     break;
                 }
             } else {
                 return Err(on_unknown_point(point));
             }
         }
+
+        if !found {
+            return Err(on_unknown_point(point));
+        }
 
         self.sequence.truncate(ix);
         self.cache = cache;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-ledger/src/state/volatile_db.rs` around lines 129 - 148, The
loop in VolatileDB's rebuild path can finish with all entries < point but none
equal it; record whether an exact match was encountered (e.g., set a found_match
boolean inside the for diff in self.sequence.iter() loop when diff.anchor.0 ==
*point), and after the loop assert found_match is true before doing
self.sequence.truncate(ix) and self.cache = cache; if not found, return
Err(on_unknown_point(point)). Update the logic around the existing ix and cache
variables (and the existing early Err(on_unknown_point(point)) for > point) so
truncation and cache replace only happen when an exact anchor match was found.
crates/amaru-consensus/src/stages/validate_block2/test_setup.rs (1)

162-173: ⚠️ Potential issue | 🟠 Major

Return the inner invalid-block error from the mock.

validate_block2::validate() uses the outer Err as a hard-failure path via or_terminate(). Returning that shape here means the tests are playing the wrong level: they hit termination instead of the normal “block invalid” branch. For a bad block, this needs to be Ok(Err(...)).

Patch idea
         if inner.validate_fails.contains(point) {
-            return Err(BlockValidationError::new(anyhow::anyhow!("mock validation failed")));
+            return Ok(Err(BlockValidationError::new(anyhow::anyhow!("mock validation failed"))));
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/validate_block2/test_setup.rs` around lines
162 - 173, The mock roll_forward_block currently returns Err(...) when a point
is in inner.validate_fails which triggers the outer hard-failure path; change
that branch to return Ok(Err(...)) instead so the mock reports a normal “block
invalid” result. Locate the async fn roll_forward_block(...) in test_setup.rs
and replace the early return that constructs
BlockValidationError::new(anyhow::anyhow!("mock validation failed")) wrapped
directly in Err with returning
Ok(Err(BlockValidationError::new(anyhow::anyhow!("mock validation failed")))),
leaving the rest of the function behavior (inserting into inner.contains and
setting inner.tip) unchanged.
crates/amaru-consensus/src/stages/validate_block2/mod.rs (2)

53-58: ⚠️ Potential issue | 🔴 Critical

This Origin branch leaves state.current ahead of the ledger.

It marks msg.tip valid and bumps state.current, but it never advances the ledger and never forwards the tip to manager. That’s a proper spooky desync: the next child can be validated against stale ledger state, and the first adopted block never reaches the adoption path. This branch needs to go through the same success side-effects as the normal validation path, with whatever special-case ledger apply the first block needs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/validate_block2/mod.rs` around lines 53 -
58, The Origin branch in stage (pub async fn stage in ValidateBlock) incorrectly
advances state.current and signals SelectChainMsg::BlockValidationResult for
msg.tip but returns early, skipping the ledger apply and manager forwarding that
the normal-success path performs; change this branch to invoke the same
success-side effects as the normal validation path (apply the genesis block to
the ledger using the same ledger-apply helper or special-case apply method,
forward the adopted tip to the manager, and emit any other success effects)
instead of returning immediately, ensuring state.current is advanced only after
the ledger is updated and manager is notified (reuse the existing success
path/utility used for non-Origin tips to avoid duplication).

137-157: ⚠️ Potential issue | 🔴 Critical

Don’t use ledger_tip as the immutable cutoff here.

The < ledger_tip check fires before contains_point(), so a perfectly legal common ancestor that is still in the volatile ledger gets rejected as “immutable db”. Ordinary fork switches can face-plant here once the shared ancestor sits below the current head.

Patch idea
-        if ancestor.point() < ledger_tip {
-            return Err(ValidationFailed::new(
-                &Peer::new("unknown"),
-                ConsensusError::RollbackBlockFailed(
-                    parent,
-                    anyhow::anyhow!("cannot rollback into the immutable db").into(),
-                ),
-            ));
-        }
         if ancestor.point() == ledger_tip || ledger.contains_point(&ancestor.point()) {
             rb_point = Some(ancestor.point());
             break;
         }
+        if ancestor.point() < ledger_tip {
+            return Err(ValidationFailed::new(
+                &Peer::new("unknown"),
+                ConsensusError::RollbackBlockFailed(
+                    parent,
+                    anyhow::anyhow!("cannot rollback into the immutable db").into(),
+                ),
+            ));
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/validate_block2/mod.rs` around lines 137 -
157, The code incorrectly treats any ancestor with point() < ledger_tip as
immutable before checking whether the point is still in the volatile ledger; fix
by reordering the checks inside the store.ancestors_with_validity loop: first
check if ancestor.point() == ledger_tip ||
ledger.contains_point(&ancestor.point()) and set rb_point = Some(...), then
after that check for ancestor.point() < ledger_tip and return the
ValidationFailed::new(...) error only if the point is not contained in the
ledger; keep the existing use of ValidationFailed::new,
ConsensusError::RollbackBlockFailed, parent, and rb_point symbols.
🧹 Nitpick comments (1)
crates/amaru-kernel/src/cardano/block_header.rs (1)

122-124: Tiny perf nit: vrf_leader() allocates on every tip compare.

cmp_tip() in crates/amaru-consensus/src/stages/select_chain_new/mod.rs calls this accessor for each comparison, so returning a fresh Vec<u8> here turns chain selection into a little heap-churn side quest. If possible, expose a borrowed or fixed-size view instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-kernel/src/cardano/block_header.rs` around lines 122 - 124, The
accessor vrf_leader() currently returns an owned Vec<u8>, causing heap
allocations on every cmp_tip() call; change it to return a borrowed slice or
fixed-size array to avoid allocations: update vrf_leader() signature to return
&[u8] (or a [u8; N] if the VRF output length is fixed) and have it delegate to
header.header_body.leader_vrf_output as a slice (or convert to an array
reference), then update callers (notably cmp_tip in
crates/amaru-consensus/src/stages/select_chain_new/mod.rs) to accept the
borrowed &[u8] (or array) instead of Vec<u8>.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/amaru-consensus/src/stages/select_chain_new/mod.rs`:
- Around line 35-40: The constructor SelectChain::new currently treats any
provided best_tip as fully validated by inserting tips.insert(best_tip.hash(),
vec![]); change this so we do not mark the tip as already-validated: either (a)
only set best_tip in new() when the caller guarantees the tip is known-valid, or
(b) if best_tip may be unvalidated, seed the pending fragment with the tip
itself (e.g. insert(best_tip.hash(), vec![best_tip.hash()]) or equivalent) so
validation will run; update the new() implementation (fields: best_tip, tips,
may_fetch_blocks) accordingly to avoid using an empty pending vector to imply
validated state.

In `@crates/amaru-ouroboros-traits/src/stores/consensus/mod.rs`:
- Around line 138-149: child_tips() currently starts traversal from ORIGIN_HASH
but immediately calls load_header_with_validity(&hash) which fails for Origin
and aborts the walk; add the same Origin fast-path used by load_tip(): if *hash
== ORIGIN_HASH then skip load_header_with_validity and treat Origin as
implicitly valid—call get_children(&ORIGIN_HASH) and push/extend those children
into to_visit (and yield their tips) so genesis children are visited; reference
the child_tips() iterator, ORIGIN_HASH, load_header_with_validity, get_children
and load_tip() when making the change.

In `@crates/amaru-protocols/src/blockfetch/initiator.rs`:
- Around line 241-244: The RequestRange2 arm currently only emits
InitiatorAction::RequestRange when proto == State::Idle, which prevents the
second pipelined request from ever being sent; change the condition to use the
2-deep pipelining rule by emitting the action when self.queue.len() < 2 (e.g.
let action = (self.queue.len() < 2).then_some(InitiatorAction::RequestRange {
from, through })), and apply the same queue.len() < 2 rule in the related
protocol-state transition handling below so the state advances consistently for
up to two in-flight RequestRange messages; update references in the
BlockFetchMessage::RequestRange2 match arm and the corresponding transition
logic that inspects self.queue.

In `@crates/pure-stage/src/trace_buffer.rs`:
- Around line 456-463: The code uses Instant::now() as a fallback timestamp when
rehydration fails, making successive hydrate() calls return different timestamps
for the same corrupted bytes; replace that ad-hoc Instant::now() with a
single-process sentinel Instant created once at startup (e.g., a static
SENTINEL_INSTANT initialized via once_cell::sync::Lazy or lazy_static) and use
SENTINEL_INSTANT in the from_slice fallback that constructs
TraceEntry::InvalidBytes so repeated calls over the same buffer are
deterministic; update the closure in the mapping (the from_slice(...)
.unwrap_or_else branch) to return (SENTINEL_INSTANT,
TraceEntry::InvalidBytes(...)) instead of (Instant::now(), ...).

---

Duplicate comments:
In `@crates/amaru-consensus/src/stages/validate_block2/mod.rs`:
- Around line 53-58: The Origin branch in stage (pub async fn stage in
ValidateBlock) incorrectly advances state.current and signals
SelectChainMsg::BlockValidationResult for msg.tip but returns early, skipping
the ledger apply and manager forwarding that the normal-success path performs;
change this branch to invoke the same success-side effects as the normal
validation path (apply the genesis block to the ledger using the same
ledger-apply helper or special-case apply method, forward the adopted tip to the
manager, and emit any other success effects) instead of returning immediately,
ensuring state.current is advanced only after the ledger is updated and manager
is notified (reuse the existing success path/utility used for non-Origin tips to
avoid duplication).
- Around line 137-157: The code incorrectly treats any ancestor with point() <
ledger_tip as immutable before checking whether the point is still in the
volatile ledger; fix by reordering the checks inside the
store.ancestors_with_validity loop: first check if ancestor.point() ==
ledger_tip || ledger.contains_point(&ancestor.point()) and set rb_point =
Some(...), then after that check for ancestor.point() < ledger_tip and return
the ValidationFailed::new(...) error only if the point is not contained in the
ledger; keep the existing use of ValidationFailed::new,
ConsensusError::RollbackBlockFailed, parent, and rb_point symbols.

In `@crates/amaru-consensus/src/stages/validate_block2/test_setup.rs`:
- Around line 162-173: The mock roll_forward_block currently returns Err(...)
when a point is in inner.validate_fails which triggers the outer hard-failure
path; change that branch to return Ok(Err(...)) instead so the mock reports a
normal “block invalid” result. Locate the async fn roll_forward_block(...) in
test_setup.rs and replace the early return that constructs
BlockValidationError::new(anyhow::anyhow!("mock validation failed")) wrapped
directly in Err with returning
Ok(Err(BlockValidationError::new(anyhow::anyhow!("mock validation failed")))),
leaving the rest of the function behavior (inserting into inner.contains and
setting inner.tip) unchanged.

In `@crates/amaru-ledger/src/state.rs`:
- Around line 634-636: contains_point currently only checks self.volatile, which
returns false after restart; update contains_point to consult the
persistent/stable ledger as well and return true if either store contains the
Point. Locate the contains_point function and change its logic to check both
self.volatile.contains(point) and the stable store (e.g.,
self.stable.contains(point) or self.stable.contains_point(point) depending on
the stable store API) and return the disjunction so persisted tips/points are
recognized.
- Around line 617-629: The check that rejects rollbacks uses self.tip() as the
lower bound and wrongly blocks ordinary rewinds; replace the comparison that
returns BackwardError::RollbackPointBeforeTip (currently "if self.tip().as_ref()
> to") with a check against the earliest rollbackable point instead (compute the
earliest rollbackable height from the persisted tip and the oldest volatile
anchor — e.g. inspect self.volatile for the earliest anchor/entry and fall back
to self.tip() if none), and only return RollbackPointBeforeTip when *to* is
strictly below that earliest rollbackable height; keep the existing future
checks that use self.tip() and the volatile last.anchor.0 as-is.

In `@crates/amaru-ledger/src/state/volatile_db.rs`:
- Around line 129-148: The loop in VolatileDB's rebuild path can finish with all
entries < point but none equal it; record whether an exact match was encountered
(e.g., set a found_match boolean inside the for diff in self.sequence.iter()
loop when diff.anchor.0 == *point), and after the loop assert found_match is
true before doing self.sequence.truncate(ix) and self.cache = cache; if not
found, return Err(on_unknown_point(point)). Update the logic around the existing
ix and cache variables (and the existing early Err(on_unknown_point(point)) for
> point) so truncation and cache replace only happen when an exact anchor match
was found.

In `@crates/amaru-protocols/src/blockfetch/initiator.rs`:
- Around line 269-282: The V2 path (Resp::V2 in the front-of-queue handling)
forwards decoded blocks and Done notifications without validating the block
range or parent/hash chain; update the logic in the Resp::V2 handling (the
branch that calls network_block.decode_block() and the branch that sends
Blocks2::Done(id)) to perform the same validation used for other protocols:
after decode_block() succeeds, verify the block(s) match the expected (from,
through) range and that parent/hash links are consistent for id before calling
eff.send(cr, Blocks2::Block(...)) or eff.send(cr, Blocks2::Done(id)).
Consolidate the validation into a shared helper used by both the Resp::V2 block
arm and the Done arm so both enforce range and chain integrity prior to emitting
Blocks2 messages.

In `@crates/amaru-protocols/src/manager.rs`:
- Around line 308-320: When handling ManagerMessage::FetchBlocks2, currently you
only check if manager.peers.is_empty() and then broadcast to every
ConnectionState::Connected but fail to reply NoBlocks if there are peers that
aren't connected fetch-capable; change the logic to first iterate manager.peers
and send ConnectionMessage::FetchBlocks2 for each ConnectionState::Connected (as
you do in the for loop), but also track whether you actually found any connected
peers (e.g., a boolean found_connected), and if after the loop found_connected
is false send eff.send(&cr, Blocks2::NoBlocks(id)).await; reference
ManagerMessage::FetchBlocks2, manager.peers, ConnectionState::Connected,
eff.send, and Blocks2::NoBlocks to locate the fix.

---

Nitpick comments:
In `@crates/amaru-kernel/src/cardano/block_header.rs`:
- Around line 122-124: The accessor vrf_leader() currently returns an owned
Vec<u8>, causing heap allocations on every cmp_tip() call; change it to return a
borrowed slice or fixed-size array to avoid allocations: update vrf_leader()
signature to return &[u8] (or a [u8; N] if the VRF output length is fixed) and
have it delegate to header.header_body.leader_vrf_output as a slice (or convert
to an array reference), then update callers (notably cmp_tip in
crates/amaru-consensus/src/stages/select_chain_new/mod.rs) to accept the
borrowed &[u8] (or array) instead of Vec<u8>.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c5cfcca6-242f-4398-ac61-8c77fb96b6b0

📥 Commits

Reviewing files that changed from the base of the PR and between 7fd3bb0 and f0b8aba.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (58)
  • crates/amaru-consensus/Cargo.toml
  • crates/amaru-consensus/src/effects/consensus_effects.rs
  • crates/amaru-consensus/src/effects/ledger_effects.rs
  • crates/amaru-consensus/src/effects/metrics_effects.rs
  • crates/amaru-consensus/src/effects/mod.rs
  • crates/amaru-consensus/src/lib.rs
  • crates/amaru-consensus/src/stages/adopt_chain/mod.rs
  • crates/amaru-consensus/src/stages/adopt_chain/test_setup.rs
  • crates/amaru-consensus/src/stages/adopt_chain/tests.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/mod.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/test_setup.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/tests.rs
  • crates/amaru-consensus/src/stages/mod.rs
  • crates/amaru-consensus/src/stages/select_chain_new/mod.rs
  • crates/amaru-consensus/src/stages/select_chain_new/test_setup.rs
  • crates/amaru-consensus/src/stages/select_chain_new/tests.rs
  • crates/amaru-consensus/src/stages/track_peers/mod.rs
  • crates/amaru-consensus/src/stages/validate_block.rs
  • crates/amaru-consensus/src/stages/validate_block2/mod.rs
  • crates/amaru-consensus/src/stages/validate_block2/test_setup.rs
  • crates/amaru-consensus/src/stages/validate_block2/tests.rs
  • crates/amaru-kernel/src/cardano/block_header.rs
  • crates/amaru-kernel/src/cardano/block_header/tests.rs
  • crates/amaru-kernel/src/cardano/block_height.rs
  • crates/amaru-kernel/src/cardano/network_block.rs
  • crates/amaru-kernel/src/cardano/slot.rs
  • crates/amaru-kernel/src/lib.rs
  • crates/amaru-ledger/src/block_validator.rs
  • crates/amaru-ledger/src/state.rs
  • crates/amaru-ledger/src/state/volatile_db.rs
  • crates/amaru-network/src/connection.rs
  • crates/amaru-ouroboros-traits/Cargo.toml
  • crates/amaru-ouroboros-traits/src/can_validate_blocks/mock.rs
  • crates/amaru-ouroboros-traits/src/can_validate_blocks/mod.rs
  • crates/amaru-ouroboros-traits/src/connections/connection_provider.rs
  • crates/amaru-ouroboros-traits/src/stores/consensus/mod.rs
  • crates/amaru-protocols/src/blockfetch/initiator.rs
  • crates/amaru-protocols/src/blockfetch/responder.rs
  • crates/amaru-protocols/src/chainsync/initiator.rs
  • crates/amaru-protocols/src/chainsync/responder.rs
  • crates/amaru-protocols/src/connection.rs
  • crates/amaru-protocols/src/handshake/initiator.rs
  • crates/amaru-protocols/src/handshake/responder.rs
  • crates/amaru-protocols/src/keepalive/initiator.rs
  • crates/amaru-protocols/src/keepalive/responder.rs
  • crates/amaru-protocols/src/manager.rs
  • crates/amaru-protocols/src/tx_submission/initiator.rs
  • crates/amaru-protocols/src/tx_submission/responder.rs
  • crates/amaru/src/bin/amaru/cmd/run.rs
  • crates/amaru/src/stages/build_node.rs
  • crates/amaru/src/stages/build_stage_graph.rs
  • crates/amaru/src/tests/in_memory_connection_provider.rs
  • crates/amaru/src/tests/node.rs
  • crates/amaru/src/tests/nodes.rs
  • crates/pure-stage/src/simulation/replay.rs
  • crates/pure-stage/src/simulation/running/mod.rs
  • crates/pure-stage/src/trace_buffer.rs
  • simulation/amaru-sim/tests/replay_test.rs
✅ Files skipped from review due to trivial changes (18)
  • simulation/amaru-sim/tests/replay_test.rs
  • crates/amaru-consensus/src/lib.rs
  • crates/pure-stage/src/simulation/replay.rs
  • crates/amaru-consensus/src/effects/metrics_effects.rs
  • crates/amaru-protocols/src/tx_submission/responder.rs
  • crates/amaru-consensus/Cargo.toml
  • crates/amaru-network/src/connection.rs
  • crates/amaru-kernel/src/cardano/block_height.rs
  • crates/amaru-protocols/src/handshake/initiator.rs
  • crates/amaru-consensus/src/stages/mod.rs
  • crates/amaru-kernel/src/lib.rs
  • crates/amaru-protocols/src/tx_submission/initiator.rs
  • crates/amaru-kernel/src/cardano/block_header/tests.rs
  • crates/amaru-protocols/src/keepalive/responder.rs
  • crates/amaru-consensus/src/stages/validate_block2/tests.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/tests.rs
  • crates/amaru/src/stages/build_stage_graph.rs
  • crates/amaru-consensus/src/effects/mod.rs
🚧 Files skipped from review as they are similar to previous changes (22)
  • crates/amaru/src/tests/in_memory_connection_provider.rs
  • crates/amaru-ouroboros-traits/src/can_validate_blocks/mock.rs
  • crates/amaru-ouroboros-traits/src/connections/connection_provider.rs
  • crates/amaru-consensus/src/effects/consensus_effects.rs
  • crates/amaru-protocols/src/handshake/responder.rs
  • crates/amaru-protocols/src/blockfetch/responder.rs
  • crates/amaru-protocols/src/keepalive/initiator.rs
  • crates/amaru-kernel/src/cardano/network_block.rs
  • crates/amaru-kernel/src/cardano/slot.rs
  • crates/amaru-protocols/src/chainsync/initiator.rs
  • crates/amaru-ledger/src/block_validator.rs
  • crates/amaru-consensus/src/stages/track_peers/mod.rs
  • crates/amaru/src/stages/build_node.rs
  • crates/amaru-protocols/src/chainsync/responder.rs
  • crates/amaru-consensus/src/stages/adopt_chain/mod.rs
  • crates/amaru-consensus/src/stages/adopt_chain/tests.rs
  • crates/amaru-protocols/src/connection.rs
  • crates/amaru-consensus/src/stages/adopt_chain/test_setup.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/test_setup.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/mod.rs
  • crates/amaru-consensus/src/stages/select_chain_new/test_setup.rs
  • crates/amaru-ouroboros-traits/Cargo.toml

Comment on lines +241 to 244
BlockFetchMessage::RequestRange2 { from, through, id, cr } => {
let action = (*proto == State::Idle).then_some(InitiatorAction::RequestRange { from, through });
self.queue.push_back((from, through, Resp::V2(id, cr)));
Ok((action, self))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

RequestRange2 can get stranded behind the first fetch, mate.

This branch still only emits InitiatorAction::RequestRange from State::Idle, but completion advances with self.queue.get(1) as if two requests may already be in flight. If the new fetch stage queues a second RequestRange2 while the first is busy, that second range just sits there like a side quest and never gets sent. Please restore the 2-deep pipelining rule here and in the protocol-state transition below.

Based on learnings, the blockfetch initiator is meant to keep up to two RequestRange messages in flight via the queue.len() < 2 rule.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-protocols/src/blockfetch/initiator.rs` around lines 241 - 244,
The RequestRange2 arm currently only emits InitiatorAction::RequestRange when
proto == State::Idle, which prevents the second pipelined request from ever
being sent; change the condition to use the 2-deep pipelining rule by emitting
the action when self.queue.len() < 2 (e.g. let action = (self.queue.len() <
2).then_some(InitiatorAction::RequestRange { from, through })), and apply the
same queue.len() < 2 rule in the related protocol-state transition handling
below so the state advances consistently for up to two in-flight RequestRange
messages; update references in the BlockFetchMessage::RequestRange2 match arm
and the corresponding transition logic that inspects self.queue.

Comment on lines +456 to +463
self.messages.iter().map(|m| {
from_slice(m).unwrap_or_else(|_| {
if let Ok((instant, value)) = from_slice::<(Instant, cbor4ii::core::Value)>(m) {
(instant, TraceEntry::InvalidBytes(m.clone(), Some(value)))
} else {
(Instant::now(), TraceEntry::InvalidBytes(m.clone(), None))
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Instant::now() makes corrupted trace rehydration nondeterministic.

Line 461 stamps fully undecodable records with the current time, so two calls to hydrate() on the same buffer can yield different values for the same stored bytes. That’s a rough debugging gremlin; a fixed sentinel instant would keep the fallback stable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/pure-stage/src/trace_buffer.rs` around lines 456 - 463, The code uses
Instant::now() as a fallback timestamp when rehydration fails, making successive
hydrate() calls return different timestamps for the same corrupted bytes;
replace that ad-hoc Instant::now() with a single-process sentinel Instant
created once at startup (e.g., a static SENTINEL_INSTANT initialized via
once_cell::sync::Lazy or lazy_static) and use SENTINEL_INSTANT in the from_slice
fallback that constructs TraceEntry::InvalidBytes so repeated calls over the
same buffer are deterministic; update the closure in the mapping (the
from_slice(...) .unwrap_or_else branch) to return (SENTINEL_INSTANT,
TraceEntry::InvalidBytes(...)) instead of (Instant::now(), ...).

rkuhn added 2 commits March 19, 2026 19:56
Signed-off-by: Roland Kuhn <rk@rkuhn.info>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/amaru-consensus/src/stages/track_peers/mod.rs (1)

288-295: ⚠️ Potential issue | 🔴 Critical

Use load_tip() here so origin intersections actually work.

For current == Point::Origin, load_header(ORIGIN_HASH) will always miss, so a perfectly valid origin intersect gets treated like an unknown point and chainsync bails. The store already has the origin fast-path in load_tip(), so this block should lean on that instead. Based on learnings, when bootstrapping a node in Amaru, it's important to handle the case where the tip is Origin.

🛠️ Patch idea
-                let Some(header) = Store::new(eff.clone()).load_header(&current.hash()) else {
+                let store = Store::new(eff.clone());
+                let Some(current_tip) = store.load_tip(&current.hash()) else {
                     tracing::warn!(%peer, %current, tip = %tip.point(), reason = "peer sent unknown intersection point", "stopping chainsync");
                     eff.send(&handler, chainsync::InitiatorMessage::Done).await;
                     return;
                 };
-                tracing::info!(%peer, %current, highest = %tip.point(), "intersect found");
-                let current = Tip::new(current, header.block_height());
-                self.upstream.insert(peer, PerPeer { current, highest: tip });
+                tracing::info!(%peer, current = %current_tip.point(), highest = %tip.point(), "intersect found");
+                self.upstream.insert(peer, PerPeer { current: current_tip, highest: tip });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/track_peers/mod.rs` around lines 288 - 295,
The code treats an origin intersection as unknown because it calls
Store::new(eff.clone()).load_header(&current.hash()), which always misses for
Point::Origin; change this to use the store's origin-aware fast-path by calling
load_tip() (or otherwise using Store::new(...).load_tip()/the store method that
returns the origin tip) instead of load_header when resolving current so origin
intersections succeed; ensure you still construct the Tip via Tip::new(current,
header.block_height()) (or using the block height returned by load_tip), then
insert into self.upstream as before (PerPeer { current, highest: tip }) and keep
the existing trace/log and chainsync::InitiatorMessage::Done handling unchanged.
♻️ Duplicate comments (1)
crates/amaru-consensus/src/stages/fetch_blocks/mod.rs (1)

106-111: ⚠️ Potential issue | 🟠 Major

Invalidate the old fetch state before taking the fast path.

If an earlier range request is still live, this branch leaves its timeout and cursor armed like a stray side quest. A later Timeout(old_req_id) can still replay FetchNextFrom(self.from), and late blocks can still be matched against the stale window. Cancel/reset the in-flight request before returning.

🛠️ Patch idea
         if missing.is_empty() {
+            self.req_id += 1;
+            self.current = Point::Origin;
+            self.from = Point::Origin;
+            self.through = Point::Origin;
+            if let Some(timeout) = self.timeout.take() {
+                store.eff().cancel_schedule(timeout).await;
+            }
             tracing::info!(tip = %tip.point(), parent = %parent, "no blocks to fetch, sending rollback");
             // send rollback
             store.eff().send(&self.downstream, (tip, parent)).await;
             store.eff().send(&self.upstream, SelectChainMsg::FetchNextFrom(tip.point())).await;
             return;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/fetch_blocks/mod.rs` around lines 106 -
111, When taking the fast path in the missing.is_empty() branch, first
cancel/reset any in-flight range request state to avoid late timeouts or block
matches against a stale window: clear whatever fields track the active range
fetch (e.g., pending request id / timeout token and the cursor/window bounds)
before calling store.eff().send(&self.downstream, (tip, parent)) and before
sending SelectChainMsg::FetchNextFrom; ensure the code that handles
Timeout(old_req_id) will ignore previously cleared request ids so late Timeouts
no longer replay FetchNextFrom(self.from).
🧹 Nitpick comments (1)
crates/amaru-ouroboros-traits/src/can_validate_blocks/mod.rs (1)

34-40: Please pin down what contains_point() and tip() actually mean.

Right now they read like whole-ledger/latest-tip queries, but the ledger side uses them for volatile membership and the immutable rollback boundary. A brief doc comment here would save the next poor soul from wiring the wrong ancestry logic — very choose the wrong bonfire energy otherwise.

✍️ Possible doc tweak
-    fn contains_point(&self, point: &Point) -> bool;
+    /// Returns true only when the point is present in the volatile DB view.
+    fn contains_point(&self, point: &Point) -> bool;
 
-    fn tip(&self) -> Point;
+    /// Oldest rollback boundary / immutable lower bound of the ledger view.
+    fn tip(&self) -> Point;

Based on learnings: contains_point() should not be relied on below ledger_tip, and self.tip() returns the stable/immutable lower bound, not the newest tip.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-ouroboros-traits/src/can_validate_blocks/mod.rs` around lines 34
- 40, The doc comments for contains_point and tip are ambiguous and lead to
misuse; update the comments for contains_point(&self, point: &Point) -> bool to
state it only reports membership within the ledger view up to the current
volatile boundary (i.e., should not be relied on for points below ledger_tip /
stable storage), and update tip(&self) -> Point to clarify it returns the
stable/immutable lower-bound tip (the applied ledger tip), not the
newest/volatile chain tip—also cross-reference volatile_tip(&self) ->
Option<Tip> in the comments so callers know to use volatile_tip for
in-memory/latest tip semantics when they need the most recent volatile view.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/amaru-consensus/src/stages/track_peers/defer_req_next.rs`:
- Around line 76-79: The defer gate currently uses a strict comparison
(ledger_height > min_h) which defers peers one block too long; change the
condition in the loop over state.pending (the tuple (peer, handler, min_h)) to
re-arm when ledger_height >= min_h so that a peer registered for min_h is
resumed as soon as the ledger reaches that height (i.e., replace the strict
greater-than check with a greater-than-or-equal check and keep sending
InitiatorMessage::RequestNext via eff.send(&handler, ...)).

In `@crates/amaru-consensus/src/stages/validate_block2/mod.rs`:
- Around line 54-56: The current check kills the stage whenever msg.parent ==
Point::Origin, which aborts bootstrapping when validating the first block;
change the logic in the validate block (the branch that logs with
tracing::error! and calls eff.terminate().await) so it only treats a
parent-of-origin as fatal when the validator is already past origin (i.e.,
state.current is not Point::Origin or otherwise indicates we've progressed past
genesis). In practice update the conditional around msg.parent == Point::Origin
to also check state.current (e.g., require state.current != Point::Origin or
state.current > Point::Origin) before logging the error and terminating, leaving
the path that validates a block whose parent is Origin when we are
bootstrapping.

In `@crates/amaru-consensus/src/stages/validate_block2/test_setup.rs`:
- Around line 118-125: The mock ledger conflates the immutable cutoff with the
applied head by using `tip` as the newest applied point and hardcoding
`volatile_tip()` to None; update the test mock to model the immutable lower
bound separately from the applied head so `roll_back_to_ancestor()` sees
realistic ledger shape: add a new field (e.g., `immutable_cutoff: Point` or
`stable_tip`) alongside `tip`, change the mock implementation of `ledger.tip()`
to return that immutable cutoff, and make `volatile_tip()`/applied-head return
the current `tip` (or Some(current_applied_point)); adjust usages and any tests
that set `tip`, `rollback_fails`, `validate_fails`, or `ledger_fails` so they
update the applied head vs immutable cutoff appropriately (references:
`roll_back_to_ancestor()`, `ledger.tip()`, `volatile_tip()`, and the fields
`tip`, `rollback_fails`, `validate_fails`, `ledger_fails`).

In `@crates/amaru-ouroboros-traits/src/stores/consensus/mod.rs`:
- Around line 138-149: The traversal seeds `to_visit` with the tip itself
(vec![*hash]) for non-origin hashes, causing the tip to appear as its own child;
change the seeding so traversal always starts from the children instead. Replace
the conditional that sets `to_visit` (currently using ORIGIN_HASH and
vec![*hash]) so it calls `self.get_children(hash)` for the non-origin case (or
simply use `self.get_children(hash)` unconditionally), ensuring the iterator
created in the closure (the from_fn loop that calls
`self.load_header_with_validity`, `self.get_children`, and returns
`header.tip()`) begins from children rather than the hash itself.

In `@examples/shared/src/lib.rs`:
- Around line 121-123: The code reads block.header.header_body.block_number
after passing block by value to a consuming function, causing a use-after-move;
fix by extracting/stashing the block number into a local (e.g. let block_number
= block.header.header_body.block_number.into()) before the call to
rules::validate_block or any other consuming use, then construct the Tip with
Tip::new(point, block_number) and only after that pass or move block into the
consuming call or state.forward as needed.

---

Outside diff comments:
In `@crates/amaru-consensus/src/stages/track_peers/mod.rs`:
- Around line 288-295: The code treats an origin intersection as unknown because
it calls Store::new(eff.clone()).load_header(&current.hash()), which always
misses for Point::Origin; change this to use the store's origin-aware fast-path
by calling load_tip() (or otherwise using Store::new(...).load_tip()/the store
method that returns the origin tip) instead of load_header when resolving
current so origin intersections succeed; ensure you still construct the Tip via
Tip::new(current, header.block_height()) (or using the block height returned by
load_tip), then insert into self.upstream as before (PerPeer { current, highest:
tip }) and keep the existing trace/log and chainsync::InitiatorMessage::Done
handling unchanged.

---

Duplicate comments:
In `@crates/amaru-consensus/src/stages/fetch_blocks/mod.rs`:
- Around line 106-111: When taking the fast path in the missing.is_empty()
branch, first cancel/reset any in-flight range request state to avoid late
timeouts or block matches against a stale window: clear whatever fields track
the active range fetch (e.g., pending request id / timeout token and the
cursor/window bounds) before calling store.eff().send(&self.downstream, (tip,
parent)) and before sending SelectChainMsg::FetchNextFrom; ensure the code that
handles Timeout(old_req_id) will ignore previously cleared request ids so late
Timeouts no longer replay FetchNextFrom(self.from).

---

Nitpick comments:
In `@crates/amaru-ouroboros-traits/src/can_validate_blocks/mod.rs`:
- Around line 34-40: The doc comments for contains_point and tip are ambiguous
and lead to misuse; update the comments for contains_point(&self, point: &Point)
-> bool to state it only reports membership within the ledger view up to the
current volatile boundary (i.e., should not be relied on for points below
ledger_tip / stable storage), and update tip(&self) -> Point to clarify it
returns the stable/immutable lower-bound tip (the applied ledger tip), not the
newest/volatile chain tip—also cross-reference volatile_tip(&self) ->
Option<Tip> in the comments so callers know to use volatile_tip for
in-memory/latest tip semantics when they need the most recent volatile view.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5d8bbc6c-b84d-4219-84ad-8190819001bf

📥 Commits

Reviewing files that changed from the base of the PR and between f0b8aba and abc4ea0.

⛔ Files ignored due to path filters (1)
  • examples/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (23)
  • crates/amaru-consensus/src/effects/consensus_effects.rs
  • crates/amaru-consensus/src/effects/ledger_effects.rs
  • crates/amaru-consensus/src/effects/mod.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/mod.rs
  • crates/amaru-consensus/src/stages/select_chain_new/mod.rs
  • crates/amaru-consensus/src/stages/select_chain_new/tests.rs
  • crates/amaru-consensus/src/stages/track_peers/defer_req_next.rs
  • crates/amaru-consensus/src/stages/track_peers/mod.rs
  • crates/amaru-consensus/src/stages/track_peers/test_setup.rs
  • crates/amaru-consensus/src/stages/track_peers/tests.rs
  • crates/amaru-consensus/src/stages/validate_block2/mod.rs
  • crates/amaru-consensus/src/stages/validate_block2/test_setup.rs
  • crates/amaru-consensus/src/stages/validate_block2/tests.rs
  • crates/amaru-ledger/src/block_validator.rs
  • crates/amaru-ledger/src/state.rs
  • crates/amaru-ledger/src/state/volatile_db.rs
  • crates/amaru-ouroboros-traits/src/can_validate_blocks/mock.rs
  • crates/amaru-ouroboros-traits/src/can_validate_blocks/mod.rs
  • crates/amaru-ouroboros-traits/src/stores/consensus/mod.rs
  • crates/amaru/src/stages/build_node.rs
  • crates/amaru/src/stages/build_stage_graph.rs
  • crates/amaru/src/stages/config.rs
  • examples/shared/src/lib.rs
✅ Files skipped from review due to trivial changes (2)
  • crates/amaru/src/stages/config.rs
  • crates/amaru-consensus/src/stages/validate_block2/tests.rs
🚧 Files skipped from review as they are similar to previous changes (6)
  • crates/amaru-ouroboros-traits/src/can_validate_blocks/mock.rs
  • crates/amaru-consensus/src/effects/consensus_effects.rs
  • crates/amaru-consensus/src/effects/mod.rs
  • crates/amaru-ledger/src/block_validator.rs
  • crates/amaru-consensus/src/stages/select_chain_new/mod.rs
  • crates/amaru/src/stages/build_node.rs

Comment on lines +76 to +79
for (peer, handler, min_h) in std::mem::take(&mut state.pending) {
if ledger_height > min_h {
eff.send(&handler, InitiatorMessage::RequestNext).await;
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This defer gate is one block too strict.

min_ledger_height sounds like the first height that should re-arm the peer, but the strict > waits for one extra block. If the ledger catches up exactly to the registered height, the peer stays parked until some unrelated progress happens, which can stall the pipeline on a single upstream — a proper “stuck at the boss door” moment.

🎯 Suggested tweak
-        if ledger_height > min_h {
+        if ledger_height >= min_h {
             eff.send(&handler, InitiatorMessage::RequestNext).await;
         } else {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/track_peers/defer_req_next.rs` around lines
76 - 79, The defer gate currently uses a strict comparison (ledger_height >
min_h) which defers peers one block too long; change the condition in the loop
over state.pending (the tuple (peer, handler, min_h)) to re-arm when
ledger_height >= min_h so that a peer registered for min_h is resumed as soon as
the ledger reaches that height (i.e., replace the strict greater-than check with
a greater-than-or-equal check and keep sending InitiatorMessage::RequestNext via
eff.send(&handler, ...)).

Comment on lines +118 to +125
/// Current ledger tip.
tip: Point,
/// If set, rollback_block will return this error.
rollback_fails: bool,
/// If set, roll_forward_block will return Ok(Err(...)) for these points.
validate_fails: BTreeSet<Point>,
/// If set, roll_forward_block will return Err(...) for these points.
ledger_fails: BTreeSet<Point>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Model the immutable cutoff separately from the applied head.

roll_back_to_ancestor() uses ledger.tip() as the immutable boundary. This mock updates tip to the newest applied point and hardcodes volatile_tip() to None, so the new rollback tests are exercising a ledger shape production never has. Based on learnings, in crates/amaru-ledger/src/state.rs, self.tip() returns the oldest known point — the stable/immutable lower bound of the chain.

Also applies to: 162-165, 170-207

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/validate_block2/test_setup.rs` around lines
118 - 125, The mock ledger conflates the immutable cutoff with the applied head
by using `tip` as the newest applied point and hardcoding `volatile_tip()` to
None; update the test mock to model the immutable lower bound separately from
the applied head so `roll_back_to_ancestor()` sees realistic ledger shape: add a
new field (e.g., `immutable_cutoff: Point` or `stable_tip`) alongside `tip`,
change the mock implementation of `ledger.tip()` to return that immutable
cutoff, and make `volatile_tip()`/applied-head return the current `tip` (or
Some(current_applied_point)); adjust usages and any tests that set `tip`,
`rollback_fails`, `validate_fails`, or `ledger_fails` so they update the applied
head vs immutable cutoff appropriately (references: `roll_back_to_ancestor()`,
`ledger.tip()`, `volatile_tip()`, and the fields `tip`, `rollback_fails`,
`validate_fails`, `ledger_fails`).

Comment on lines +138 to +149
let mut to_visit = if hash == &ORIGIN_HASH { self.get_children(hash) } else { vec![*hash] };
Box::new(iter::from_fn(move || {
loop {
let hash = to_visit.pop()?;
tracing::debug!(hash = %hash, "visiting child");
let (header, validity) = self.load_header_with_validity(&hash)?;
if validity == Some(false) {
continue;
}
let children = self.get_children(&hash);
to_visit.extend(children);
return Some(header.tip());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Start the traversal from the children, not from hash itself.

crates/amaru/src/stages/build_node.rs folds child_tips(get_best_chain_hash()) into its candidate set. Seeding to_visit with vec![*hash] makes the current best-chain tip show up as its own child, so the empty-descendant case turns into a wee ghost candidate and boot logic can re-pick the already-selected tip.

🛠️ Patch idea
-        let mut to_visit = if hash == &ORIGIN_HASH { self.get_children(hash) } else { vec![*hash] };
+        let mut to_visit = self.get_children(hash);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-ouroboros-traits/src/stores/consensus/mod.rs` around lines 138 -
149, The traversal seeds `to_visit` with the tip itself (vec![*hash]) for
non-origin hashes, causing the tip to appear as its own child; change the
seeding so traversal always starts from the children instead. Replace the
conditional that sets `to_visit` (currently using ORIGIN_HASH and vec![*hash])
so it calls `self.get_children(hash)` for the non-origin case (or simply use
`self.get_children(hash)` unconditionally), ensuring the iterator created in the
closure (the from_fn loop that calls `self.load_header_with_validity`,
`self.get_children`, and returns `header.tip()`) begins from children rather
than the hash itself.

Signed-off-by: Roland Kuhn <rk@rkuhn.info>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
crates/amaru/src/tests/nodes.rs (2)

143-156: ⚠️ Potential issue | 🟠 Major

Drain timeout is logged as success

Line 143 lowers the cap, but when the loop actually exhausts max_drain_steps, Line 155 logs “completed” like it’s a win. That’s a sneaky false positive, mate — it hides unfinished propagation.

Please log this as timeout/exhaustion (or fail the test path) instead of successful completion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru/src/tests/nodes.rs` around lines 143 - 156, The drain loop uses
max_drain_steps and currently unconditionally logs success after the for loop;
change the end-of-loop behavior so that if the loop exits due to reaching
max_drain_steps (i.e., the for completed without early return from
pick_random_runnable_node) it is treated as a timeout/failure rather than a
success: update the post-loop tracing::info("Drain phase completed...") to emit
an error/timeout message (or panic/assert to fail the test) and include context
like max_drain_steps and remaining runnable state; reference the symbols
max_drain_steps, pick_random_runnable_node, and run_effect to locate the loop
and implement the new timeout logging/failure behavior.

108-131: ⚠️ Potential issue | 🟠 Major

Phase 1 can now silently truncate work when the budget is hit

Line 108 drops the budget to 10_000, but if that limit is reached before has_pending_actions() is empty, we still move to drain() and never enqueue again. That can abandon queued actions and give you a false “all good” run.

Suggested guard (fail fast when cap is reached)
 pub fn run(&mut self, rng: &mut RandStdRng) {
     let max_steps = 10_000; // safety limit
+    let mut all_actions_consumed = false;

     // Phase 1: Run with action enqueueing until all actions consumed
     for step in 0..max_steps {
         for node in self.nodes.iter_mut() {
             node.enqueue_pending_action();
             node.advance_inputs();
         }

         if self.nodes.iter().all(|n| !n.has_pending_actions()) {
             tracing::info!("All actions consumed at step {step}, entering drain phase");
+            all_actions_consumed = true;
             break;
         }

         let Some(node) = self.pick_random_active_node(rng) else {
             tracing::info!("All nodes terminated at step {step}");
             return;
         };
         node.run_effect();
     }

+    if !all_actions_consumed {
+        tracing::warn!("Phase 1 hit max_steps={max_steps} with pending actions still queued");
+        return;
+    }
+
     // Phase 2: Drain remaining effects
     self.drain(rng);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru/src/tests/nodes.rs` around lines 108 - 131, The loop capped by
max_steps can silently stop before all actions are consumed and then call
drain(), losing enqueued actions; after the Phase 1 for-loop (which uses
enqueue_pending_action, advance_inputs, has_pending_actions,
pick_random_active_node, run_effect) add a guard that checks if any node still
has_pending_actions() and, if so, fail fast (panic or assert) with a clear
message including the reached max_steps/step so the test fails instead of
silently truncating work before calling drain().
🧹 Nitpick comments (3)
crates/amaru-consensus/src/stages/fetch_blocks/tests.rs (2)

187-227: Test name could be clearer than test_block2_received.

Minor naming nit: this reads like protocol Block2 rather than “second block received”. Renaming would reduce head-scratching later.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/fetch_blocks/tests.rs` around lines 187 -
227, Rename the test function test_block2_received to a clearer name (e.g.,
test_received_second_block or test_handles_second_block_received) to avoid
confusion with a protocol token; update the fn declaration for
test_block2_received and any references to that symbol (such as calls or logs
that mention the test name) so they match the new identifier, keeping all test
logic, assertions (assert_trace, logs.assert_and_remove), and uses of helpers
like setup, prep, FetchBlocksMsg, and SelectChainMsg unchanged.

55-64: Consider extracting shared NewTip setup into a tiny helper.

Both tests build the same tip/parent/message shape; a small helper would trim repetition and keep future tweaks in one place.

♻️ Tiny refactor sketch
+fn new_tip_msg(prep: &TestPrep) -> (Tip, Point, FetchBlocksMsg) {
+    let tip = prep.headers.h2.tip();
+    let parent = prep.headers.h1.point();
+    let msg = FetchBlocksMsg::NewTip(tip, parent);
+    (tip, parent, msg)
+}

Also applies to: 91-99

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/fetch_blocks/tests.rs` around lines 55 -
64, Extract the repeated NewTip setup into a small helper that takes the test
fixture (e.g., test_prep or &Prep/TestPrep) and returns the tip, parent, or the
FetchBlocksMsg::NewTip directly; locate the duplicated code around test_prep(),
prep.store_headers(...), prep.store_block(...), prep.set_anchor(...),
prep.headers.h2.tip(), prep.headers.h1.point(), and FetchBlocksMsg::NewTip and
replace both occurrences with calls to this helper so the message construction
is centralized and the tests only call something like make_new_tip_msg(&prep) or
make_tip_and_parent(&prep).
simulation/amaru-sim/tests/simulation_test.rs (1)

23-23: Give the ignored test a proper backstory.

Right now, that #[ignore] is giving me major "unfinished side quest" vibes. Let's throw in a reason string so future you doesn't have to squint at git blame like you're deciphering an ancient map.

Your codebase already does this in at least one spot (chainsync tests), so you're in good company. Plus, given the team's plans to replace this out-of-process testing with the in-process NodeHandle eventually, a little inline note keeps the context from evaporating.

♻️ Suggested tweak
-#[ignore]
+#[ignore = "temporarily disabled while simulator bootstrap/restart path is stabilized"]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@simulation/amaru-sim/tests/simulation_test.rs` at line 23, Replace the bare
#[ignore] on the test in simulation_test.rs with a reason string (e.g. #[ignore
= "out-of-process test; will be replaced by in-process NodeHandle later"]) so
future readers know why the test is skipped; locate the test annotated with
#[ignore] and update the attribute to include a short explanatory message
mentioning the out-of-process nature and planned NodeHandle replacement.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/amaru/src/tests/nodes.rs`:
- Line 177: The current random selection uses self.pick_random_node(rng, |n|
!n.is_terminated() && (n.has_runnable_effects() || n.has_effects())), which can
pick nodes with only queued/non-runnable effects and cause drain-mode spins;
change the predicate to require runnable work only — e.g., call
self.pick_random_node(rng, |n| !n.is_terminated() && n.has_runnable_effects())
(or replace with an existing pick_random_runnable_node helper) so we never
schedule nodes that only have non-runnable has_effects().

---

Outside diff comments:
In `@crates/amaru/src/tests/nodes.rs`:
- Around line 143-156: The drain loop uses max_drain_steps and currently
unconditionally logs success after the for loop; change the end-of-loop behavior
so that if the loop exits due to reaching max_drain_steps (i.e., the for
completed without early return from pick_random_runnable_node) it is treated as
a timeout/failure rather than a success: update the post-loop
tracing::info("Drain phase completed...") to emit an error/timeout message (or
panic/assert to fail the test) and include context like max_drain_steps and
remaining runnable state; reference the symbols max_drain_steps,
pick_random_runnable_node, and run_effect to locate the loop and implement the
new timeout logging/failure behavior.
- Around line 108-131: The loop capped by max_steps can silently stop before all
actions are consumed and then call drain(), losing enqueued actions; after the
Phase 1 for-loop (which uses enqueue_pending_action, advance_inputs,
has_pending_actions, pick_random_active_node, run_effect) add a guard that
checks if any node still has_pending_actions() and, if so, fail fast (panic or
assert) with a clear message including the reached max_steps/step so the test
fails instead of silently truncating work before calling drain().

---

Nitpick comments:
In `@crates/amaru-consensus/src/stages/fetch_blocks/tests.rs`:
- Around line 187-227: Rename the test function test_block2_received to a
clearer name (e.g., test_received_second_block or
test_handles_second_block_received) to avoid confusion with a protocol token;
update the fn declaration for test_block2_received and any references to that
symbol (such as calls or logs that mention the test name) so they match the new
identifier, keeping all test logic, assertions (assert_trace,
logs.assert_and_remove), and uses of helpers like setup, prep, FetchBlocksMsg,
and SelectChainMsg unchanged.
- Around line 55-64: Extract the repeated NewTip setup into a small helper that
takes the test fixture (e.g., test_prep or &Prep/TestPrep) and returns the tip,
parent, or the FetchBlocksMsg::NewTip directly; locate the duplicated code
around test_prep(), prep.store_headers(...), prep.store_block(...),
prep.set_anchor(...), prep.headers.h2.tip(), prep.headers.h1.point(), and
FetchBlocksMsg::NewTip and replace both occurrences with calls to this helper so
the message construction is centralized and the tests only call something like
make_new_tip_msg(&prep) or make_tip_and_parent(&prep).

In `@simulation/amaru-sim/tests/simulation_test.rs`:
- Line 23: Replace the bare #[ignore] on the test in simulation_test.rs with a
reason string (e.g. #[ignore = "out-of-process test; will be replaced by
in-process NodeHandle later"]) so future readers know why the test is skipped;
locate the test annotated with #[ignore] and update the attribute to include a
short explanatory message mentioning the out-of-process nature and planned
NodeHandle replacement.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6831bb04-2284-4f7d-86cc-2a308b9e8116

📥 Commits

Reviewing files that changed from the base of the PR and between abc4ea0 and e11c168.

📒 Files selected for processing (6)
  • crates/amaru-consensus/src/stages/fetch_blocks/mod.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/tests.rs
  • crates/amaru/src/stages/build_node.rs
  • crates/amaru/src/tests/nodes.rs
  • simulation/amaru-sim/src/simulator/args.rs
  • simulation/amaru-sim/tests/simulation_test.rs
✅ Files skipped from review due to trivial changes (1)
  • simulation/amaru-sim/src/simulator/args.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/amaru/src/stages/build_node.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/mod.rs

/// Pick a random non-terminated node with runnable effects
fn pick_random_runnable_node(&mut self, rng: &mut RandStdRng) -> Option<&mut Node> {
self.pick_random_node(rng, |n| !n.is_terminated() && n.has_runnable_effects())
self.pick_random_node(rng, |n| !n.is_terminated() && (n.has_runnable_effects() || n.has_effects()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

has_effects() in runnable selection can cause no-progress spins

On Line 177, including n.has_effects() means we can schedule nodes that have queued effects but nothing runnable right now. In drain mode, that can waste the entire budget without actual progress (very “stuck in a side quest” vibe).

Suggested predicate fix
 fn pick_random_runnable_node(&mut self, rng: &mut RandStdRng) -> Option<&mut Node> {
-    self.pick_random_node(rng, |n| !n.is_terminated() && (n.has_runnable_effects() || n.has_effects()))
+    self.pick_random_node(rng, |n| !n.is_terminated() && n.has_runnable_effects())
 }

Based on learnings: In crates/amaru/src/tests/nodes.rs, the drain loop is intentionally designed to exit when pick_random_runnable_node returns None to avoid non-terminating simulation from keep-alives/sleeping nodes.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.pick_random_node(rng, |n| !n.is_terminated() && (n.has_runnable_effects() || n.has_effects()))
self.pick_random_node(rng, |n| !n.is_terminated() && n.has_runnable_effects())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru/src/tests/nodes.rs` at line 177, The current random selection
uses self.pick_random_node(rng, |n| !n.is_terminated() &&
(n.has_runnable_effects() || n.has_effects())), which can pick nodes with only
queued/non-runnable effects and cause drain-mode spins; change the predicate to
require runnable work only — e.g., call self.pick_random_node(rng, |n|
!n.is_terminated() && n.has_runnable_effects()) (or replace with an existing
pick_random_runnable_node helper) so we never schedule nodes that only have
non-runnable has_effects().

Signed-off-by: Roland Kuhn <rk@rkuhn.info>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/amaru-consensus/src/stages/fetch_blocks/mod.rs`:
- Around line 206-218: The Done(id) handling in this match (Blocks2::Done)
prematurely advances curr_id, which can drop later Blocks2::Block responses from
slower peers and force timeouts; instead stop advancing curr_id on Done here —
either don't update curr_id in the Blocks2::Done arm and let FetchBlocks itself
retire requests, or propagate the responder's id with the block by changing
eff.send to include the peer's id (e.g. send FetchBlocksMsg::Block with the
source id) so the FetchBlocks stage can decide when to retire an id; update the
Blocks2::Done arm to leave curr_id unchanged and implement the corresponding
change in FetchBlocksMsg::Block/FetchBlocks to handle per-peer Done semantics.
- Line 67: new_tip() currently only uses debug_assert! to assume no in-flight
fetch; make this a real runtime guard by checking runtime state (e.g., if
self.missing is not empty or self.req_id.is_some() or self.timeout.is_some())
and explicitly cancel/teardown the previous fetch before starting a new one:
take and cancel the existing timeout (self.timeout.take()), clear/reset req_id
(self.req_id.take()), and clear any pending reply/state (self.missing.clear() or
equivalent) so no old timer/replies can bleed into the new session; apply the
same explicit cancel/reset logic to the similar block around lines 107-116 to
ensure single in-flight fetch is enforced in production.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3cf62f87-9f4c-4927-a6f0-290ea69a1ad5

📥 Commits

Reviewing files that changed from the base of the PR and between e11c168 and 9cf1b1e.

📒 Files selected for processing (6)
  • crates/amaru-consensus/src/stages/fetch_blocks/mod.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/test_setup.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/tests.rs
  • crates/amaru-kernel/src/cardano/block.rs
  • crates/amaru-kernel/src/cardano/block_header/tests.rs
  • crates/amaru-kernel/src/cardano/network_block/tests.rs
✅ Files skipped from review due to trivial changes (2)
  • crates/amaru-consensus/src/stages/fetch_blocks/tests.rs
  • crates/amaru-consensus/src/stages/fetch_blocks/test_setup.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/amaru-kernel/src/cardano/block.rs
  • crates/amaru-kernel/src/cardano/block_header/tests.rs

tracing::debug!(tip = %tip.point(), parent = %parent, "fetching blocks");
let store = Store::new(eff);
// find blocks to retrieve
debug_assert!(self.missing.is_empty(), "missing blocks should be empty when starting a new tip");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Make the single in-flight fetch a real runtime invariant.

Line 67 only checks this in debug builds, but the rest of new_tip() rewrites missing, req_id, and timeout as if no older fetch can still be live. If a second NewTip ever lands here, the old timer/replies can bleed into the new session like a dodgy sequel. I’d make this a real guard/cancel path in production instead of trusting debug_assert!.

Also applies to: 107-116

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/fetch_blocks/mod.rs` at line 67, new_tip()
currently only uses debug_assert! to assume no in-flight fetch; make this a real
runtime guard by checking runtime state (e.g., if self.missing is not empty or
self.req_id.is_some() or self.timeout.is_some()) and explicitly cancel/teardown
the previous fetch before starting a new one: take and cancel the existing
timeout (self.timeout.take()), clear/reset req_id (self.req_id.take()), and
clear any pending reply/state (self.missing.clear() or equivalent) so no old
timer/replies can bleed into the new session; apply the same explicit
cancel/reset logic to the similar block around lines 107-116 to ensure single
in-flight fetch is enforced in production.

Comment on lines +206 to +218
match msg {
// completely ignore empty responses, fetch stage will deal with timeouts
Blocks2::NoBlocks(_) => (curr_id, fetch),
// ignore responses to prior requests
Blocks2::Block(id, _) if id < curr_id => (curr_id, fetch),
Blocks2::Block(id, block) => {
eff.send(&fetch, FetchBlocksMsg::Block(block)).await;
// getting higher id implies a new request has started
(id.max(curr_id), fetch)
}
// getting done message implies a new request will start with id+1, but Done might be old as well
Blocks2::Done(id) => ((id + 1).max(curr_id), fetch),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't let one peer's Done(id) retire the whole request.

crates/amaru-protocols/src/manager.rs:308-321 fans FetchBlocks2 out to every connected peer, so the first Done(id) here only means that peer is finished. Bumping curr_id on Line 217 drops later Block(id, ..) replies from slower peers, even when FetchBlocks is still waiting because the fast peer’s earlier block was rejected by the body/parent checks. That turns a recoverable mixed-peer response into a forced timeout. I’d only retire id once FetchBlocks itself moves on, or carry the id through FetchBlocksMsg::Block so the fetch stage can make that call.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/amaru-consensus/src/stages/fetch_blocks/mod.rs` around lines 206 -
218, The Done(id) handling in this match (Blocks2::Done) prematurely advances
curr_id, which can drop later Blocks2::Block responses from slower peers and
force timeouts; instead stop advancing curr_id on Done here — either don't
update curr_id in the Blocks2::Done arm and let FetchBlocks itself retire
requests, or propagate the responder's id with the block by changing eff.send to
include the peer's id (e.g. send FetchBlocksMsg::Block with the source id) so
the FetchBlocks stage can decide when to retire an id; update the Blocks2::Done
arm to leave curr_id unchanged and implement the corresponding change in
FetchBlocksMsg::Block/FetchBlocks to handle per-peer Done semantics.

@rkuhn rkuhn force-pushed the rk/fetch-blocks branch from 26bb505 to a7975c8 Compare March 23, 2026 12:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant