From 695b8f00e989811fc24c3dd3885579d37bd06bcc Mon Sep 17 00:00:00 2001 From: Adam Weil Date: Mon, 2 Feb 2026 18:54:52 -0500 Subject: [PATCH 01/10] feat(rpc): WIP implement getSlot --- src/cmd.zig | 18 ++++++++ src/consensus/vote_listener.zig | 12 +++++- src/replay/consensus/cluster_sync.zig | 21 +++++----- src/replay/consensus/core.zig | 40 ++++++++---------- src/replay/consensus/process_result.zig | 6 +-- src/replay/service.zig | 20 +++++---- src/replay/trackers.zig | 39 ++++++++++++++++-- src/rpc/methods.zig | 55 +++++++++++++++++++++++++ 8 files changed, 164 insertions(+), 47 deletions(-) diff --git a/src/cmd.zig b/src/cmd.zig index 696de69435..8944dd59d0 100644 --- a/src/cmd.zig +++ b/src/cmd.zig @@ -1413,6 +1413,9 @@ fn validator( else null; + var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var replay_service_state: ReplayAndConsensusServiceState = try .init(allocator, .{ .app_base = &app_base, .account_store = .{ .accounts_db_two = &new_db }, @@ -1424,9 +1427,15 @@ fn validator( .voting_enabled = voting_enabled, .vote_account_address = maybe_vote_pubkey, .stop_at_slot = cfg.stop_at_slot, + .latest_processed_slot = &latest_processed_slot, + .latest_confirmed_slot = &latest_confirmed_slot, }); defer replay_service_state.deinit(allocator); + try app_base.rpc_hooks.set(allocator, sig.rpc.methods.SlotHookContext{ + .slot_tracker = &replay_service_state.replay_state.slot_tracker, + }); + const replay_thread = try replay_service_state.spawnService( &app_base, if (maybe_vote_sockets) |*vs| vs else null, @@ -1590,6 +1599,9 @@ fn replayOffline( ); defer epoch_tracker.deinit(allocator); + var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var replay_service_state: ReplayAndConsensusServiceState = try .init(allocator, .{ .app_base = &app_base, .account_store = .{ .accounts_db_two = &new_db }, @@ -1601,6 +1613,8 @@ fn replayOffline( .voting_enabled = false, .vote_account_address = null, .stop_at_slot = cfg.stop_at_slot, + .latest_processed_slot = &latest_processed_slot, + .latest_confirmed_slot = &latest_confirmed_slot, }); defer replay_service_state.deinit(allocator); @@ -2307,6 +2321,8 @@ const ReplayAndConsensusServiceState = struct { voting_enabled: bool, vote_account_address: ?Pubkey, stop_at_slot: ?Slot, + latest_processed_slot: *sig.replay.trackers.ForkChoiceProcessedSlot, + latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, }, ) !ReplayAndConsensusServiceState { var replay_state: replay.service.ReplayState = replay_state: { @@ -2363,6 +2379,8 @@ const ReplayAndConsensusServiceState = struct { .hard_forks = hard_forks, .replay_threads = params.replay_threads, .stop_at_slot = params.stop_at_slot, + .latest_processed_slot = params.latest_processed_slot, + .latest_confirmed_slot = params.latest_confirmed_slot, }, if (params.disable_consensus) .disabled else .enabled); }; errdefer replay_state.deinit(); diff --git a/src/consensus/vote_listener.zig b/src/consensus/vote_listener.zig index 5a491faa51..c251e436d7 100644 --- a/src/consensus/vote_listener.zig +++ b/src/consensus/vote_listener.zig @@ -28,7 +28,7 @@ pub const SlotDataProvider = struct { epoch_tracker: *const EpochTracker, pub fn rootSlot(self: *const SlotDataProvider) Slot { - return self.slot_tracker.root; + return self.slot_tracker.root.load(.monotonic); } fn getSlotHash(self: *const SlotDataProvider, slot: Slot) ?Hash { @@ -317,6 +317,7 @@ pub const VoteCollector = struct { latest_vote_slot_per_validator: sig.utils.collections.PubkeyMap(Slot), last_process_root: sig.time.Instant, vote_processing_time: VoteProcessingTiming, + latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, metrics: VoteListenerMetrics, pub fn deinit(self: *VoteCollector, allocator: std.mem.Allocator) void { @@ -331,6 +332,7 @@ pub const VoteCollector = struct { now: sig.time.Instant, root_slot: Slot, registry: *sig.prometheus.Registry(.{}), + latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, ) !VoteCollector { return .{ .gossip_vote_receptor = .INIT, @@ -339,6 +341,7 @@ pub const VoteCollector = struct { .latest_vote_slot_per_validator = .empty, .last_process_root = now, .vote_processing_time = .ZEROES, + .latest_confirmed_slot = latest_confirmed_slot, .metrics = try .init(registry), }; } @@ -397,6 +400,7 @@ pub const VoteCollector = struct { gossip_vote_txs, &self.vote_processing_time, &self.latest_vote_slot_per_validator, + self.latest_confirmed_slot, self.metrics, ); defer allocator.free(confirmed_slots); @@ -420,6 +424,7 @@ fn listenAndConfirmVotes( gossip_vote_txs: []const vote_parser.ParsedVote, vote_processing_time: ?*VoteProcessingTiming, latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot), + latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, metrics: VoteListenerMetrics, ) std.mem.Allocator.Error![]const ThresholdConfirmedSlot { var replay_votes_buffer: std.ArrayListUnmanaged(vote_parser.ParsedVote) = .empty; @@ -454,6 +459,7 @@ fn listenAndConfirmVotes( latest_vote_slot_per_validator, gossip_vote_txs, replay_votes, + latest_confirmed_slot, metrics, ); } @@ -468,6 +474,7 @@ fn filterAndConfirmWithNewVotes( latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot), gossip_vote_txs: []const vote_parser.ParsedVote, replayed_votes: []const vote_parser.ParsedVote, + latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, metrics: VoteListenerMetrics, ) std.mem.Allocator.Error![]const ThresholdConfirmedSlot { const root_slot = slot_data_provider.rootSlot(); @@ -503,6 +510,7 @@ fn filterAndConfirmWithNewVotes( &new_optimistic_confirmed_slots, is_gossip, latest_vote_slot_per_validator, + latest_confirmed_slot, ); if (is_gossip) metrics.gossip_votes_processed.inc() @@ -767,6 +775,7 @@ fn trackNewVotesAndNotifyConfirmations( new_optimistic_confirmed_slots: *std.ArrayListUnmanaged(ThresholdConfirmedSlot), is_gossip_vote: bool, latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot), + latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, ) std.mem.Allocator.Error!void { if (vote.isEmpty()) return; const root = slot_data_provider.rootSlot(); @@ -867,6 +876,7 @@ fn trackNewVotesAndNotifyConfirmations( .slot = slot, .hash = hash, }); + latest_confirmed_slot.update(slot); // Notify subscribers about new optimistic confirmation if (senders.bank_notification) |sender| { sender.send(.{ .optimistically_confirmed = slot }) catch |err| { diff --git a/src/replay/consensus/cluster_sync.zig b/src/replay/consensus/cluster_sync.zig index 27290ef3ef..ab70121b30 100644 --- a/src/replay/consensus/cluster_sync.zig +++ b/src/replay/consensus/cluster_sync.zig @@ -492,7 +492,7 @@ fn processAncestorHashesDuplicateSlots( slot_tracker: *const SlotTracker, duplicate_slots_to_repair: *SlotData.DuplicateSlotsToRepair, ) !void { - const root = slot_tracker.root; + const root = slot_tracker.root.load(.monotonic); while (ancestor_duplicate_slots_receiver.tryReceive()) |ancestor_dupe_slot_to_repair| { const request_type = ancestor_dupe_slot_to_repair.request_type; @@ -560,7 +560,7 @@ fn processDuplicateConfirmedSlots( ancestor_hashes_replay_update_sender: *sig.sync.Channel(AncestorHashesReplayUpdate), purge_repair_slot_counter: *SlotData.PurgeRepairSlotCounters, ) !void { - const root = slot_tracker.root; + const root = slot_tracker.root.load(.monotonic); for (duplicate_confirmed_slots_received) |new_duplicate_confirmed_slot| { const confirmed_slot, const duplicate_confirmed_hash = new_duplicate_confirmed_slot.tuple(); if (confirmed_slot <= root) { @@ -642,7 +642,7 @@ fn processPrunedButPopularForks( slot_tracker: *const SlotTracker, ancestor_hashes_replay_update_sender: *sig.sync.Channel(AncestorHashesReplayUpdate), ) !void { - const root = slot_tracker.root; + const root = slot_tracker.root.load(.monotonic); while (pruned_but_popular_forks_receiver.tryReceive()) |new_popular_pruned_slot| { if (new_popular_pruned_slot <= root) { continue; @@ -700,7 +700,7 @@ fn processDuplicateSlots( }); } - break :blk .{ slot_tracker.root, slots_hashes }; + break :blk .{ slot_tracker.root.load(.monotonic), slots_hashes }; }; for (new_duplicate_slots.constSlice(), slots_hashes.constSlice()) |duplicate_slot, slot_hash| { // WindowService should only send the signal once per slot @@ -1564,7 +1564,7 @@ test "apply state changes" { // MarkSlotDuplicate should mark progress map and remove // the slot from fork choice - const duplicate_slot = slot_tracker.root + 1; + const duplicate_slot = slot_tracker.root.load(.monotonic) + 1; const duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?; // AKA: `ResultingStateChange::MarkSlotDuplicate` in agave try heaviest_subtree_fork_choice.markForkInvalidCandidate(allocator, &.{ @@ -1624,7 +1624,7 @@ test "apply state changes slot frozen" { var ledger = try ledger_tests.initTestLedger(allocator, @src(), .FOR_TESTS); defer ledger.deinit(); - const duplicate_slot = slot_tracker.root + 1; + const duplicate_slot = slot_tracker.root.load(.monotonic) + 1; const duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?; // Simulate ReplayStage freezing a Slot with the given hash. @@ -1660,9 +1660,10 @@ test "apply state changes slot frozen" { // version in blockstore. const new_slot_hash: Hash = .initRandom(random); const root_slot_hash: sig.core.hash.SlotAndHash = rsh: { - const root_slot_info = slot_tracker.get(slot_tracker.root).?; + const root_slot = slot_tracker.root.load(.monotonic); + const root_slot_info = slot_tracker.get(root_slot).?; break :rsh .{ - .slot = slot_tracker.root, + .slot = root_slot, .hash = root_slot_info.state.hash.readCopy().?, }; }; @@ -1712,7 +1713,7 @@ test "apply state changes duplicate confirmed matches frozen" { var ledger = try ledger_tests.initTestLedger(allocator, @src(), .FOR_TESTS); defer ledger.deinit(); - const duplicate_slot = slot_tracker.root + 1; + const duplicate_slot = slot_tracker.root.load(.monotonic) + 1; const our_duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?; var duplicate_slots_to_repair: SlotData.DuplicateSlotsToRepair = .empty; @@ -1808,7 +1809,7 @@ test "apply state changes slot frozen and duplicate confirmed matches frozen" { var purge_repair_slot_counter: SlotData.PurgeRepairSlotCounters = .empty; defer purge_repair_slot_counter.deinit(allocator); - const duplicate_slot = slot_tracker.root + 1; + const duplicate_slot = slot_tracker.root.load(.monotonic) + 1; const our_duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?; // Setup and check the state that is about to change. diff --git a/src/replay/consensus/core.zig b/src/replay/consensus/core.zig index 35aee30a00..f1a0a4e5cc 100644 --- a/src/replay/consensus/core.zig +++ b/src/replay/consensus/core.zig @@ -153,7 +153,8 @@ pub const TowerConsensus = struct { ); errdefer fork_choice.deinit(allocator); - const root_ref = deps.slot_tracker.get(deps.slot_tracker.root).?; + const root = deps.slot_tracker.root.load(.monotonic); + const root_ref = deps.slot_tracker.get(root).?; const root_ancestors = &root_ref.constants.ancestors; var tower: Tower = if (deps.identity.vote_account) |vote_account_address| @@ -164,8 +165,8 @@ pub const TowerConsensus = struct { vote_account_address, ) else - .{ .root = deps.slot_tracker.root }; - tower.setRoot(deps.slot_tracker.root); + .{ .root = root }; + tower.setRoot(root); const replay_tower: ReplayTower = try .init( .from(deps.logger), @@ -176,7 +177,7 @@ pub const TowerConsensus = struct { errdefer replay_tower.deinit(allocator); var vote_collector: sig.consensus.VoteCollector = - try .init(deps.now, deps.slot_tracker.root, deps.registry); + try .init(deps.now, root, deps.registry, deps.slot_tracker.latest_confirmed_slot); errdefer vote_collector.deinit(allocator); return .{ @@ -205,7 +206,7 @@ pub const TowerConsensus = struct { ) !HeaviestSubtreeForkChoice { const root_slot, const root_hash = blk: { const root = slot_tracker.getRoot(); - const root_slot = slot_tracker.root; + const root_slot = slot_tracker.root.load(.monotonic); const root_hash = root.state.hash.readCopy(); break :blk .{ root_slot, root_hash.? }; }; @@ -571,7 +572,7 @@ pub const TowerConsensus = struct { } // Update cluster with the duplicate confirmation status. // Analogous to [ReplayStage::mark_slots_duplicate_confirmed](https://github.com/anza-xyz/agave/blob/47c0383f2301e5a739543c1af9992ae182b7e06c/core/src/replay_stage.rs#L3876) - const root_slot = slot_tracker.root; + const root_slot = slot_tracker.root.load(.monotonic); for (duplicate_confirmed_forks.items) |duplicate_confirmed_fork| { const slot, const frozen_hash = duplicate_confirmed_fork.tuple(); try self.handleDuplicateConfirmedFork( @@ -1598,7 +1599,7 @@ fn checkAndHandleNewRoot( // Audit: The rest of the code maps to Self::handle_new_root in Agave. // Update the slot tracker. // Set new root. - slot_tracker.root = new_root; + slot_tracker.root.store(new_root, .monotonic); // Prune non rooted slots slot_tracker.pruneNonRooted(allocator); @@ -4723,12 +4724,10 @@ test "edge cases - duplicate slot" { const slot_tracker = &replay_state.slot_tracker; const progress_map = &replay_state.progress_map; - std.debug.assert(slot_tracker.root == 0); + const root_slot0 = slot_tracker.root.load(.monotonic); + std.debug.assert(root_slot0 == 0); - const root_slot0 = slot_tracker.root; const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; - - std.debug.assert(root_slot0 == 0); // assert initial root value std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash // -- slot1 -- // @@ -4894,12 +4893,11 @@ test "edge cases - duplicate confirmed slot" { const slot_tracker = &replay_state.slot_tracker; const progress_map = &replay_state.progress_map; - std.debug.assert(slot_tracker.root == 0); + const root_slot0 = slot_tracker.root.load(.monotonic); + std.debug.assert(root_slot0 == 0); - const root_slot0 = slot_tracker.root; const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; - - std.debug.assert(root_slot0 == 0); // assert initial root value + std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash // -- slot1 -- // @@ -5066,16 +5064,14 @@ test "edge cases - gossip verified vote hashes" { const slot_tracker = &replay_state.slot_tracker; const progress_map = &replay_state.progress_map; - std.debug.assert(slot_tracker.root == 0); + const root_slot0 = slot_tracker.root.load(.monotonic); + std.debug.assert(root_slot0 == 0); // assert initial root value var vote_collector: sig.consensus.vote_listener.VoteCollector = - try .init(.EPOCH_ZERO, slot_tracker.root, ®istry); + try .init(.EPOCH_ZERO, root_slot0, ®istry); defer vote_collector.deinit(gpa); - const root_slot0 = slot_tracker.root; const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; - - std.debug.assert(root_slot0 == 0); // assert initial root value std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash // -- slot1 -- // @@ -5986,7 +5982,7 @@ test "root advances after vote satisfies lockouts" { consensus.replay_tower.tower.votes.len, ); - try std.testing.expectEqual(1, slot_tracker.root); + try std.testing.expectEqual(1, slot_tracker.root.load(.monotonic)); // No longer tracking slot 0 try std.testing.expect(!slot_tracker.contains(0)); // Still tracking slot 1 @@ -6073,7 +6069,7 @@ test "root advances after vote satisfies lockouts" { const last_voted = consensus.replay_tower.tower.lastVotedSlot(); try std.testing.expectEqual(33, last_voted); - try std.testing.expectEqual(2, slot_tracker.root); + try std.testing.expectEqual(2, slot_tracker.root.load(.monotonic)); // No longer tracking slot 0 try std.testing.expect(!slot_tracker.contains(0)); // No longer tracking slot 1 diff --git a/src/replay/consensus/process_result.zig b/src/replay/consensus/process_result.zig index 9c932be7cd..917b510786 100644 --- a/src/replay/consensus/process_result.zig +++ b/src/replay/consensus/process_result.zig @@ -117,7 +117,7 @@ fn markDeadSlot( params.allocator, .from(params.logger), dead_slot, - params.slot_tracker.root, + params.slot_tracker.root.load(.monotonic), params.duplicate_slots_to_repair, ancestor_hashes_replay_update_sender, dead_state, @@ -148,7 +148,7 @@ fn markDeadSlot( params.allocator, .from(params.logger), dead_slot, - params.slot_tracker.root, + params.slot_tracker.root.load(.monotonic), params.duplicate_slots_tracker, params.fork_choice, duplicate_state, @@ -194,7 +194,7 @@ fn updateConsensusForFrozenSlot(params: ProcessResultParams, slot: Slot) !void { params.allocator, .from(params.logger), slot, - params.slot_tracker.root, + params.slot_tracker.root.load(.monotonic), params.ledger.resultWriter(), params.fork_choice, params.duplicate_slots_to_repair, diff --git a/src/replay/service.zig b/src/replay/service.zig index 8c1ec98674..8c7e9de4a4 100644 --- a/src/replay/service.zig +++ b/src/replay/service.zig @@ -162,6 +162,8 @@ pub const Dependencies = struct { hard_forks: sig.core.HardForks, replay_threads: u32, stop_at_slot: ?Slot, + latest_processed_slot: *replay.trackers.ForkChoiceProcessedSlot, + latest_confirmed_slot: *replay.trackers.OptimisticallyConfirmedSlot, }; pub const ConsensusStatus = enum { @@ -208,10 +210,13 @@ pub const ReplayState = struct { const zone = tracy.Zone.init(@src(), .{ .name = "ReplayState init" }); defer zone.deinit(); - var slot_tracker: SlotTracker = try .init(deps.allocator, deps.root.slot, .{ - .constants = deps.root.constants, - .state = deps.root.state, - }); + var slot_tracker: SlotTracker = try .init( + deps.allocator, + deps.latest_processed_slot, + deps.latest_confirmed_slot, + deps.root.slot, + .{ .constants = deps.root.constants, .state = deps.root.state }, + ); errdefer slot_tracker.deinit(deps.allocator); errdefer { // do not free the root slot data parameter, we don't own it unless the function returns successfully @@ -324,7 +329,7 @@ pub fn trackNewSlots( var zone = tracy.Zone.init(@src(), .{ .name = "trackNewSlots" }); defer zone.deinit(); - const root = slot_tracker.root; + const root = slot_tracker.root.load(.monotonic); var frozen_slots = try slot_tracker.frozenSlots(allocator); defer frozen_slots.deinit(allocator); @@ -572,6 +577,7 @@ fn freezeCompletedSlots(state: *ReplayState, results: []const ReplayResult) !boo slot, last_entry_hash, )); + state.slot_tracker.latest_confirmed_slot.update(slot); processed_a_slot = true; } else { state.logger.info().logf("partially replayed slot: {}", .{slot}); @@ -588,7 +594,7 @@ fn bypassConsensus(state: *ReplayState) !void { const slot_tracker = &state.slot_tracker; state.logger.info().logf("rooting slot with SlotTree.reRoot: {}", .{new_root}); - slot_tracker.root = new_root; + slot_tracker.root.store(new_root, .monotonic); slot_tracker.pruneNonRooted(state.allocator); try state.status_cache.addRoot(state.allocator, new_root); @@ -916,7 +922,7 @@ test "advance calls consensus.process with empty replay results" { try advanceReplay(&replay_state, try registry.initStruct(Metrics), null); // No slots were replayed - try std.testing.expectEqual(0, replay_state.slot_tracker.root); + try std.testing.expectEqual(0, replay_state.slot_tracker.root.load(.monotonic)); } test "Execute testnet block single threaded" { diff --git a/src/replay/trackers.zig b/src/replay/trackers.zig index c7e2612444..8c350bbe6d 100644 --- a/src/replay/trackers.zig +++ b/src/replay/trackers.zig @@ -8,6 +8,30 @@ const Slot = sig.core.Slot; const SlotConstants = sig.core.SlotConstants; const SlotState = sig.core.SlotState; +pub const ForkChoiceProcessedSlot = struct { + slot: std.atomic.Value(Slot) = .init(0), + + pub fn update(self: *@This(), new_slot: Slot) void { + _ = self.slot.fetchMax(new_slot, .monotonic); + } + + pub fn get(self: *const @This()) Slot { + return self.slot.load(.monotonic); + } +}; + +pub const OptimisticallyConfirmedSlot = struct { + slot: std.atomic.Value(Slot) = .init(0), + + pub fn update(self: *@This(), new_slot: Slot) void { + _ = self.slot.fetchMax(new_slot, .monotonic); + } + + pub fn get(self: *const @This()) Slot { + return self.slot.load(.monotonic); + } +}; + /// Central registry that tracks high-level info about slots and how they fork. /// /// This is a lean version of `BankForks` from agave, focused on storing the @@ -20,7 +44,9 @@ const SlotState = sig.core.SlotState; /// will end as soon as the items are removed. pub const SlotTracker = struct { slots: std.AutoArrayHashMapUnmanaged(Slot, *Element), - root: Slot, + latest_processed_slot: *ForkChoiceProcessedSlot, + latest_confirmed_slot: *OptimisticallyConfirmedSlot, + root: std.atomic.Value(Slot), pub const Element = struct { constants: SlotConstants, @@ -41,13 +67,17 @@ pub const SlotTracker = struct { pub fn init( allocator: std.mem.Allocator, + latest_processed_slot: *ForkChoiceProcessedSlot, + latest_confirmed_slot: *OptimisticallyConfirmedSlot, root_slot: Slot, /// ownership is transferred to this function, except in the case of an error return slot_init: Element, ) std.mem.Allocator.Error!SlotTracker { var self: SlotTracker = .{ - .root = root_slot, + .root = .init(root_slot), .slots = .empty, + .latest_processed_slot = latest_processed_slot, + .latest_confirmed_slot = latest_confirmed_slot, }; errdefer self.deinit(allocator); @@ -114,7 +144,7 @@ pub const SlotTracker = struct { } pub fn getRoot(self: *const SlotTracker) Reference { - return self.get(self.root).?; // root slot's bank must exist + return self.get(self.root.load(.monotonic)).?; // root slot's bank must exist } pub fn contains(self: *const SlotTracker, slot: Slot) bool { @@ -188,8 +218,9 @@ pub const SlotTracker = struct { var slice = self.slots.entries.slice(); var index: usize = 0; + const root = self.root.load(.monotonic); while (index < slice.len) { - if (slice.items(.key)[index] < self.root) { + if (slice.items(.key)[index] < root) { const element = slice.items(.value)[index]; element.state.deinit(allocator); element.constants.deinit(allocator); diff --git a/src/rpc/methods.zig b/src/rpc/methods.zig index 4ce6bdfe43..c03cf14971 100644 --- a/src/rpc/methods.zig +++ b/src/rpc/methods.zig @@ -686,3 +686,58 @@ pub const common = struct { shredVersion: ?u16 = null, }; }; + +pub const SlotHookContext = struct { + slot_tracker: *const sig.replay.trackers.SlotTracker, + + fn getLatestProcessedSlot(self: @This()) !Slot { + const slot = self.slot_tracker.latest_processed_slot.get(); + if (slot == 0) { + return error.RpcNoProcessedSlot; + } + return slot; + } + + fn getLatestConfirmedSlot(self: @This()) !Slot { + const slot = self.slot_tracker.latest_confirmed_slot.get(); + if (slot == 0) { + return error.RpcNoConfirmedSlot; + } + return slot; + } + + fn getLatestFinalizedSlot(self: @This()) !Slot { + const slot = self.slot_tracker.root.load(.monotonic); + if (slot == 0) { + return error.RpcNoFinalizedSlot; + } + return slot; + } + + fn getSlotImpl( + self: @This(), + config: common.CommitmentSlotConfig, + ) !Slot { + const commitment = config.commitment orelse .finalized; + + const slot = switch (commitment) { + .processed => try self.getLatestProcessedSlot(), + .confirmed => try self.getLatestConfirmedSlot(), + .finalized => try self.getLatestFinalizedSlot(), + }; + + if (config.minContextSlot) |min_slot| { + if (slot < min_slot) { + return error.RpcMinContextSlotNotMet; + } + } + + return slot; + } + + pub fn getSlot(self: @This(), _: std.mem.Allocator, params: GetSlot) !GetSlot.Response { + const config = params.config orelse common.CommitmentSlotConfig{}; + + return self.getSlotImpl(config); + } +}; From a288ad7ab0c4880189a86b153e6ac1abec2b2790 Mon Sep 17 00:00:00 2001 From: prestonsn Date: Tue, 3 Feb 2026 03:25:03 +0000 Subject: [PATCH 02/10] feat(replay): wire latest_processed_slot for RPC getSlot - Add SlotTree.tip() to get highest slot among fork leaves (bypass mode) - Update latest_processed_slot inside vote handling to match Agave behavior - Update latest_processed_slot in bypassConsensus() from SlotTree.tip() - Change ForkChoiceProcessedSlot to use store() since slot can decrease Note: Bypass mode uses highest fork tip, while consensus mode matches Agave's semantics where processed slot only updates when voting. --- src/replay/consensus/core.zig | 6 ++++++ src/replay/service.zig | 15 ++++++++++++++- src/replay/trackers.zig | 17 +++++++++++++++-- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/replay/consensus/core.zig b/src/replay/consensus/core.zig index f1a0a4e5cc..6925188ee5 100644 --- a/src/replay/consensus/core.zig +++ b/src/replay/consensus/core.zig @@ -658,6 +658,12 @@ pub const TowerConsensus = struct { slot_leaders, vote_sockets, ); + + // Update the latest processed slot to the bank being voted on. + // This matches Agave's behavior: the processed slot is updated inside + // handle_votable_bank(), which is only called when vote_bank.is_some(). + // See: https://github.com/anza-xyz/agave/blob/5e900421520a10933642d5e9a21e191a70f9b125/core/src/replay_stage.rs#L2683 + slot_tracker.latest_processed_slot.set(voted.slot); } // Reset onto a fork diff --git a/src/replay/service.zig b/src/replay/service.zig index 8c7e9de4a4..6c67d19243 100644 --- a/src/replay/service.zig +++ b/src/replay/service.zig @@ -577,7 +577,6 @@ fn freezeCompletedSlots(state: *ReplayState, results: []const ReplayResult) !boo slot, last_entry_hash, )); - state.slot_tracker.latest_confirmed_slot.update(slot); processed_a_slot = true; } else { state.logger.info().logf("partially replayed slot: {}", .{slot}); @@ -590,6 +589,20 @@ fn freezeCompletedSlots(state: *ReplayState, results: []const ReplayResult) !boo /// bypass the tower bft consensus protocol, simply rooting slots with SlotTree.reRoot fn bypassConsensus(state: *ReplayState) !void { + // NOTE: Processed slot semantics differ from Agave when Sig is in bypass-consensus mode. + // In bypass mode, `latest_processed_slot` is set to the highest slot among all fork + // leaves (SlotTree.tip()). + // + // This differs from Agave's behavior: the processed slot is only updated + // when `vote_bank.is_some()` (i.e., when the validator has selected a bank + // to vote on after passing all consensus checks like lockout, threshold, and + // switch proof). If the validator is locked out or fails + // threshold checks, the processed slot is NOT updated and can go stale. + // See: https://github.com/anza-xyz/agave/blob/5e900421520a10933642d5e9a21e191a70f9b125/core/src/replay_stage.rs#L2683 + // + // TowerConsensus implements Agave's processed slot semantics when consensus is enabled. + state.slot_tracker.latest_processed_slot.set(state.slot_tree.tip()); + if (state.slot_tree.reRoot(state.allocator)) |new_root| { const slot_tracker = &state.slot_tracker; diff --git a/src/replay/trackers.zig b/src/replay/trackers.zig index 8c350bbe6d..31539a5db3 100644 --- a/src/replay/trackers.zig +++ b/src/replay/trackers.zig @@ -11,8 +11,11 @@ const SlotState = sig.core.SlotState; pub const ForkChoiceProcessedSlot = struct { slot: std.atomic.Value(Slot) = .init(0), - pub fn update(self: *@This(), new_slot: Slot) void { - _ = self.slot.fetchMax(new_slot, .monotonic); + /// Set the current processed slot (heaviest fork tip). + /// Uses store() because this can decrease when the fork choice + /// switches to a different fork with a lower slot. + pub fn set(self: *@This(), new_slot: Slot) void { + self.slot.store(new_slot, .monotonic); } pub fn get(self: *const @This()) Slot { @@ -243,6 +246,16 @@ pub const SlotTree = struct { const List = std.ArrayListUnmanaged; const min_age = 32; + /// Returns the highest slot among all fork tips (leaves). + /// In bypass mode (without ForkChoice), this represents the "processed" slot. + pub fn tip(self: *const SlotTree) Slot { + var max_slot: Slot = self.root.slot; + for (self.leaves.items) |leaf| { + max_slot = @max(max_slot, leaf.slot); + } + return max_slot; + } + pub fn deinit(const_self: SlotTree, allocator: Allocator) void { var self = const_self; self.root.destroyRecursivelyDownstream(allocator); From 1048311135784f2be8da1d0127caf88da36ac960 Mon Sep 17 00:00:00 2001 From: prestonsn Date: Wed, 4 Feb 2026 03:30:35 +0000 Subject: [PATCH 03/10] fix(tests): add processed and confirmed slot trackers to unit tests --- src/consensus/replay_tower.zig | 14 +- src/consensus/vote_listener.zig | 33 +++- src/replay/consensus/cluster_sync.zig | 5 + src/replay/consensus/core.zig | 210 +++++++++++++++++------- src/replay/consensus/process_result.zig | 8 +- src/replay/service.zig | 10 ++ src/replay/trackers.zig | 28 +++- src/rpc/methods.zig | 34 +--- 8 files changed, 237 insertions(+), 105 deletions(-) diff --git a/src/consensus/replay_tower.zig b/src/consensus/replay_tower.zig index af98443b64..c53454d9ac 100644 --- a/src/consensus/replay_tower.zig +++ b/src/consensus/replay_tower.zig @@ -5448,10 +5448,16 @@ pub const TestFixture = struct { errdefer state.deinit(allocator); state.hash = .init(root.hash); - break :blk try .init(allocator, root.slot, .{ - .constants = constants, - .state = state, - }); + var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + break :blk try .init( + allocator, + &latest_processed_slot, + &latest_confirmed_slot, + root.slot, + .{ .constants = constants, .state = state }, + ); }; errdefer slot_tracker.deinit(allocator); diff --git a/src/consensus/vote_listener.zig b/src/consensus/vote_listener.zig index c251e436d7..4feabcc3e3 100644 --- a/src/consensus/vote_listener.zig +++ b/src/consensus/vote_listener.zig @@ -969,8 +969,13 @@ test "trackNewVotesAndNotifyConfirmations filter" { var prng_state: std.Random.DefaultPrng = .init(std.testing.random_seed); const prng = prng_state.random(); + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var slot_tracker: SlotTracker = try .init( allocator, + &processed_slot, + &confirmed_slot, 0, try slotTrackerElementGenesis(allocator, .DEFAULT), ); @@ -1047,6 +1052,7 @@ test "trackNewVotesAndNotifyConfirmations filter" { &new_optimistic_confirmed_slots, is_gossip_vote, &latest_vote_slot_per_validator, + &confirmed_slot, ); } diff.sortAsc(); @@ -1096,10 +1102,14 @@ test "trackNewVotesAndNotifyConfirmations filter" { &new_optimistic_confirmed_slots, is_gossip_vote, &latest_vote_slot_per_validator, + &confirmed_slot, ); } diff.sortAsc(); try std.testing.expectEqualSlices(Slot, diff.map.keys(), &.{ 7, 8 }); + + // No stake delegated, so optimistic confirmation should not be reached. + try std.testing.expectEqual(0, confirmed_slot.get()); } const ThresholdReachedResults = std.bit_set.IntegerBitSet(THRESHOLDS_TO_CHECK.len); @@ -1742,7 +1752,10 @@ test "simple usage" { var registry: sig.prometheus.Registry(.{}) = .init(allocator); defer registry.deinit(); - var slot_tracker: SlotTracker = try .init(allocator, 0, .{ + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker: SlotTracker = try .init(allocator, &processed_slot, &confirmed_slot, 0, .{ .constants = .{ .parent_slot = 0, .parent_hash = .ZEROES, @@ -1790,6 +1803,7 @@ test "simple usage" { .EPOCH_ZERO, slot_data_provider.rootSlot(), ®istry, + &confirmed_slot, ); defer vote_collector.deinit(allocator); @@ -1800,6 +1814,11 @@ test "simple usage" { .ledger = &ledger, .gossip_votes = null, }); + + // Since no votes were sent, slot trackers should remain at their initialized state. + // NOTE: processed slot is not used here, but required to construct SlotTracker. + try std.testing.expectEqual(0, processed_slot.get()); + try std.testing.expectEqual(0, confirmed_slot.get()); } test "check trackers" { @@ -1823,11 +1842,14 @@ test "check trackers" { const root_slot: Slot = 0; + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var slot_tracker: SlotTracker = blk: { var state: sig.core.SlotState = .GENESIS; errdefer state.deinit(allocator); - break :blk try .init(allocator, root_slot, .{ + break :blk try .init(allocator, &processed_slot, &confirmed_slot, root_slot, .{ .constants = .{ .parent_slot = root_slot -| 1, .parent_hash = .ZEROES, @@ -1891,7 +1913,7 @@ test "check trackers" { defer replay_votes_channel.destroy(); var vote_collector: VoteCollector = - try .init(.EPOCH_ZERO, slot_data_provider.rootSlot(), ®istry); + try .init(.EPOCH_ZERO, slot_data_provider.rootSlot(), ®istry, &confirmed_slot); defer vote_collector.deinit(allocator); var expected_trackers: std.ArrayListUnmanaged(struct { Slot, TestSlotVoteTracker }) = .empty; @@ -2007,6 +2029,11 @@ test "check trackers" { expected_trackers.items, actual_trackers.items, ); + + // Votes were processed but no stake was delegated to validators, so + // optimisitic confirmation was not reached. + try std.testing.expectEqual(0, confirmed_slot.get()); + try std.testing.expectEqual(0, confirmed_slot.get()); } // tests for OptimisticConfirmationVerifier moved to optimistic_vote_verifier.zig diff --git a/src/replay/consensus/cluster_sync.zig b/src/replay/consensus/cluster_sync.zig index ab70121b30..22bfd880e7 100644 --- a/src/replay/consensus/cluster_sync.zig +++ b/src/replay/consensus/cluster_sync.zig @@ -1484,8 +1484,13 @@ const TestData = struct { }), }; + var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var slot_tracker: SlotTracker = try .init( allocator, + &latest_processed_slot, + &latest_confirmed_slot, root_slot, try slot_infos[root_slot].toDummyElem(&slot_infos, random), ); diff --git a/src/replay/consensus/core.zig b/src/replay/consensus/core.zig index 6925188ee5..e1ed47ecf0 100644 --- a/src/replay/consensus/core.zig +++ b/src/replay/consensus/core.zig @@ -2504,17 +2504,27 @@ test "checkAndHandleNewRoot - missing slot" { var prng = std.Random.DefaultPrng.init(std.testing.random_seed); const random = prng.random(); - const root = SlotAndHash{ + const root_slot_and_hash = SlotAndHash{ .slot = 0, .hash = Hash.initRandom(random), }; - var fixture = try TestFixture.init(allocator, root); + var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + const root: std.atomic.Value(Slot) = .init(0); + + // NOTE: TestFixture has its own SlotTracker as well. Unclear if that matters. + var fixture = try TestFixture.init( + allocator, + root_slot_and_hash, + ); defer fixture.deinit(allocator); // Build a tracked slot set wrapped in RwMux var slot_tracker: SlotTracker = .{ - .root = root.slot, + .root = root, + .latest_processed_slot = &latest_processed_slot, + .latest_confirmed_slot = &latest_confirmed_slot, .slots = .empty, }; defer slot_tracker.deinit(testing.allocator); @@ -2522,7 +2532,7 @@ test "checkAndHandleNewRoot - missing slot" { { const constants: SlotConstants = try .genesis(allocator, .initRandom(random)); errdefer constants.deinit(allocator); - try slot_tracker.put(testing.allocator, root.slot, .{ + try slot_tracker.put(testing.allocator, root_slot_and_hash.slot, .{ .constants = constants, .state = .GENESIS, }); @@ -2556,6 +2566,10 @@ test "checkAndHandleNewRoot - missing slot" { ); try testing.expectError(error.MissingSlot, result); + + // Verify slot trackers remain at initial state after failure + try testing.expectEqual(0, slot_tracker.latest_processed_slot.get()); + try testing.expectEqual(0, slot_tracker.latest_confirmed_slot.get()); } test "checkAndHandleNewRoot - missing hash" { @@ -2571,12 +2585,17 @@ test "checkAndHandleNewRoot - missing hash" { var fixture = try TestFixture.init(allocator, root); defer fixture.deinit(allocator); - var slot_tracker2 = RwMux(SlotTracker).init(.{ .root = root.slot, .slots = .empty }); - defer { - const ptr, var lg = slot_tracker2.writeWithLock(); - defer lg.unlock(); - ptr.deinit(allocator); - } + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker2: SlotTracker = .{ + .root = .init(root.slot), + .slots = .empty, + .latest_processed_slot = &processed_slot, + .latest_confirmed_slot = &confirmed_slot, + }; + defer slot_tracker2.deinit(allocator); + { const constants = try SlotConstants.genesis(allocator, .initRandom(random)); errdefer constants.deinit(allocator); @@ -2584,9 +2603,7 @@ test "checkAndHandleNewRoot - missing hash" { var state: SlotState = .GENESIS; errdefer state.deinit(allocator); - const ptr, var lg = slot_tracker2.writeWithLock(); - defer lg.unlock(); - try ptr.put(allocator, root.slot, .{ + try slot_tracker2.put(allocator, root.slot, .{ .constants = constants, .state = state, }); @@ -2607,12 +2624,10 @@ test "checkAndHandleNewRoot - missing hash" { defer epoch_tracker.deinit(allocator); // Try to check a slot that doesn't exist in the tracker - const slot_tracker2_ptr, var slot_tracker2_lg = slot_tracker2.writeWithLock(); - defer slot_tracker2_lg.unlock(); const result = checkAndHandleNewRoot( allocator, test_state.resultWriter(), - slot_tracker2_ptr, + &slot_tracker2, &fixture.progress, &fixture.fork_choice, &epoch_tracker, @@ -2622,6 +2637,10 @@ test "checkAndHandleNewRoot - missing hash" { ); try testing.expectError(error.MissingHash, result); + + // Verify slot trackers remain at initial state after failure + try testing.expectEqual(0, processed_slot.get()); + try testing.expectEqual(0, confirmed_slot.get()); } test "checkAndHandleNewRoot - empty slot tracker" { @@ -2638,13 +2657,17 @@ test "checkAndHandleNewRoot - empty slot tracker" { var fixture = try TestFixture.init(testing.allocator, root); defer fixture.deinit(testing.allocator); - const slot_tracker_val3: SlotTracker = SlotTracker{ .root = root.slot, .slots = .{} }; - var slot_tracker3 = RwMux(SlotTracker).init(slot_tracker_val3); - defer { - const ptr, var lg = slot_tracker3.writeWithLock(); - defer lg.unlock(); - ptr.deinit(testing.allocator); - } + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker3: SlotTracker = .{ + .root = .init(root.slot), + .slots = .empty, + .latest_processed_slot = &processed_slot, + .latest_confirmed_slot = &confirmed_slot, + }; + defer slot_tracker3.deinit(testing.allocator); + var registry = sig.prometheus.Registry(.{}).init(testing.allocator); defer registry.deinit(); @@ -2660,12 +2683,10 @@ test "checkAndHandleNewRoot - empty slot tracker" { defer epoch_tracker.deinit(allocator); // Try to check a slot that doesn't exist in the tracker - const slot_tracker3_ptr, var slot_tracker3_lg = slot_tracker3.writeWithLock(); - defer slot_tracker3_lg.unlock(); const result = checkAndHandleNewRoot( testing.allocator, test_state.resultWriter(), - slot_tracker3_ptr, + &slot_tracker3, &fixture.progress, &fixture.fork_choice, &epoch_tracker, @@ -2675,6 +2696,10 @@ test "checkAndHandleNewRoot - empty slot tracker" { ); try testing.expectError(error.EmptySlotTracker, result); + + // Verify slot trackers remain at initial state after failure + try testing.expectEqual(0, processed_slot.get()); + try testing.expectEqual(0, confirmed_slot.get()); } test "checkAndHandleNewRoot - success" { @@ -2702,12 +2727,16 @@ test "checkAndHandleNewRoot - success" { var fixture = try TestFixture.init(allocator, root); defer fixture.deinit(allocator); - var slot_tracker4 = RwMux(SlotTracker).init(.{ .root = root.slot, .slots = .empty }); - defer { - const ptr, var lg = slot_tracker4.writeWithLock(); - defer lg.unlock(); - ptr.deinit(allocator); - } + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker4: SlotTracker = .{ + .root = .init(root.slot), + .slots = .empty, + .latest_processed_slot = &processed_slot, + .latest_confirmed_slot = &confirmed_slot, + }; + defer slot_tracker4.deinit(allocator); { var constants2 = try SlotConstants.genesis(allocator, .initRandom(random)); @@ -2727,13 +2756,11 @@ test "checkAndHandleNewRoot - success" { state2.hash = .init(hash2.hash); state3.hash = .init(hash3.hash); - const ptr, var lg = slot_tracker4.writeWithLock(); - defer lg.unlock(); - try ptr.put(allocator, hash2.slot, .{ + try slot_tracker4.put(allocator, hash2.slot, .{ .constants = constants2, .state = state2, }); - try ptr.put(allocator, hash3.slot, .{ + try slot_tracker4.put(allocator, hash3.slot, .{ .constants = constants3, .state = state3, }); @@ -2786,13 +2813,8 @@ test "checkAndHandleNewRoot - success" { } try testing.expectEqual(1, fixture.progress.map.count()); - // Now the write lock is released, we can acquire a read lock - { - const ptr, var lg = slot_tracker4.readWithLock(); - defer lg.unlock(); - for (ptr.slots.keys()) |remaining_slots| { - try testing.expect(remaining_slots >= hash3.slot); - } + for (slot_tracker4.slots.keys()) |remaining_slots| { + try testing.expect(remaining_slots >= hash3.slot); } try testing.expect(!fixture.progress.map.contains(hash1.slot)); } @@ -5073,8 +5095,10 @@ test "edge cases - gossip verified vote hashes" { const root_slot0 = slot_tracker.root.load(.monotonic); std.debug.assert(root_slot0 == 0); // assert initial root value + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var vote_collector: sig.consensus.vote_listener.VoteCollector = - try .init(.EPOCH_ZERO, root_slot0, ®istry); + try .init(.EPOCH_ZERO, root_slot0, ®istry, &confirmed_slot); defer vote_collector.deinit(gpa); const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; @@ -5225,6 +5249,9 @@ test "edge cases - gossip verified vote hashes" { &.{ pk1, pk2 }, tower_consensus.latest_validator_votes.max_gossip_frozen_votes.keys(), ); + + // Optimisic confirmation threshold not reached during this test. + try std.testing.expectEqual(0, confirmed_slot.get()); } // TODO: Re-implement tests for the new consolidated API @@ -5285,8 +5312,13 @@ test "vote on heaviest frozen descendant with no switch" { try bhq.mut().insertGenesisHash(allocator, root_state.hash.readCopy().?, 0); } + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var slot_tracker = try SlotTracker.init( allocator, + &processed_slot, + &confirmed_slot, root_slot, .{ .constants = root_consts, @@ -5423,6 +5455,12 @@ test "vote on heaviest frozen descendant with no switch" { // 5. Assert forkchoice try std.testing.expectEqual(slot_1, consensus.fork_choice.heaviestOverallSlot().slot); try std.testing.expectEqual(slot1_hash, consensus.fork_choice.heaviestOverallSlot().hash); + + // 6. Assert trackers + // processed_slot is updated to slot_1 when the tower votes on it + try std.testing.expectEqual(slot_1, processed_slot.get()); + // confirmed_slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, confirmed_slot.get()); } // State setup @@ -5459,8 +5497,13 @@ test "vote accounts with landed votes populate bank stats" { try bhq.mut().insertGenesisHash(allocator, root_state.hash.readCopy().?, 0); } + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var slot_tracker = try SlotTracker.init( allocator, + &processed_slot, + &confirmed_slot, root_slot, .{ .constants = root_consts, @@ -5643,6 +5686,12 @@ test "vote accounts with landed votes populate bank stats" { // } try std.testing.expectEqual(6, stats1.lockout_intervals.map.get(5).?.items.len); try std.testing.expectEqual(slot_1, consensus.fork_choice.heaviestOverallSlot().slot); + + // Assert trackers + // processed_slot is updated to slot_1 when the tower votes on it + try std.testing.expectEqual(slot_1, processed_slot.get()); + // confirmed_slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, confirmed_slot.get()); } // Test case: @@ -5747,10 +5796,16 @@ test "root advances after vote satisfies lockouts" { hashes[i] = Hash{ .data = .{@as(u8, @intCast(i % 256))} ** Hash.SIZE }; } - var slot_tracker: SlotTracker = try .init(allocator, initial_root, .{ - .constants = root_consts, - .state = root_state, - }); + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker: SlotTracker = try .init( + allocator, + &processed_slot, + &confirmed_slot, + initial_root, + .{ .constants = root_consts, .state = root_state }, + ); defer slot_tracker.deinit(allocator); var prng = std.Random.DefaultPrng.init(12345); @@ -6092,6 +6147,12 @@ test "root advances after vote satisfies lockouts" { try std.testing.expect(progress.map.get(2) != null); try std.testing.expectEqual(33, consensus.fork_choice.heaviestOverallSlot().slot); } + + // Assert trackers + // processed_slot is updated to slot 33 when the tower votes on it + try std.testing.expectEqual(33, processed_slot.get()); + // confirmed_slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, confirmed_slot.get()); } // Test case: @@ -6153,10 +6214,16 @@ test "vote refresh when no new vote available" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var slot_tracker = try SlotTracker.init(allocator, root_slot, .{ - .constants = root_consts, - .state = root_state, - }); + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker = try SlotTracker.init( + allocator, + &processed_slot, + &confirmed_slot, + root_slot, + .{ .constants = root_consts, .state = root_state }, + ); defer slot_tracker.deinit(allocator); const slot1_hash = Hash{ .data = .{1} ** Hash.SIZE }; @@ -6281,6 +6348,12 @@ test "vote refresh when no new vote available" { try std.testing.expectEqual(1, consensus.replay_tower.tower.votes.len); try std.testing.expectEqual(1, consensus.fork_choice.heaviestOverallSlot().slot); + + // Assert trackers + // processed_slot is updated to slot 1 when the tower votes on it + try std.testing.expectEqual(1, processed_slot.get()); + // confirmed_slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, confirmed_slot.get()); } // Test case: @@ -6346,10 +6419,16 @@ test "detect and mark duplicate confirmed fork" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var slot_tracker = try SlotTracker.init(allocator, root_slot, .{ - .constants = root_consts, - .state = root_state, - }); + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker = try SlotTracker.init( + allocator, + &processed_slot, + &confirmed_slot, + root_slot, + .{ .constants = root_consts, .state = root_state }, + ); defer slot_tracker.deinit(allocator); // Add frozen slot 1 @@ -6611,10 +6690,16 @@ test "detect and mark duplicate slot" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var slot_tracker = try SlotTracker.init(allocator, root_slot, .{ - .constants = root_consts, - .state = root_state, - }); + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker = try SlotTracker.init( + allocator, + &processed_slot, + &confirmed_slot, + root_slot, + .{ .constants = root_consts, .state = root_state }, + ); defer slot_tracker.deinit(allocator); const slot1_hash = Hash{ .data = .{1} ** Hash.SIZE }; @@ -6810,8 +6895,13 @@ test "successful fork switch (switch_proof)" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var slot_tracker = try SlotTracker.init( allocator, + &processed_slot, + &confirmed_slot, root_slot, .{ .constants = root_consts, .state = root_state }, ); diff --git a/src/replay/consensus/process_result.zig b/src/replay/consensus/process_result.zig index 917b510786..67dbd8d732 100644 --- a/src/replay/consensus/process_result.zig +++ b/src/replay/consensus/process_result.zig @@ -305,9 +305,15 @@ const TestReplayStateResources = struct { self.duplicate_slots_to_repair = DuplicateSlotsToRepair.empty; self.purge_repair_slot_counter = PurgeRepairSlotCounters.empty; + var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + const root: std.atomic.Value(Slot) = .init(0); + self.slot_tracker = SlotTracker{ .slots = .empty, - .root = 0, + .latest_processed_slot = &latest_processed_slot, + .latest_confirmed_slot = &latest_confirmed_slot, + .root = root, }; self.ancestor_hashes_replay_update_channel = try sig diff --git a/src/replay/service.zig b/src/replay/service.zig index 6c67d19243..504287eb22 100644 --- a/src/replay/service.zig +++ b/src/replay/service.zig @@ -812,6 +812,9 @@ test trackNewSlots { &.{ .{ 0, 0 }, .{ 1, 0 }, .{ 2, 1 }, .{ 4, 1 }, .{ 6, 4 } }, &.{ 3, 5 }, ); + + try std.testing.expectEqual(0, processed_slot.get()); + try std.testing.expectEqual(0, confirmed_slot.get()); } fn expectSlotTracker( @@ -1261,6 +1264,8 @@ pub const DependencyStubs = struct { .replay_threads = 1, .stop_at_slot = null, + .latest_processed_slot = &latest_processed_slot, + .latest_confirmed_slot = &latest_confirmed_slot, }, .enabled); } @@ -1310,6 +1315,9 @@ pub const DependencyStubs = struct { const hard_forks = try bank_fields.hard_forks.clone(allocator); errdefer hard_forks.deinit(allocator); + var latest_processed_slot: replay.trackers.ForkChoiceProcessedSlot = .{}; + var latest_confirmed_slot: replay.trackers.OptimisticallyConfirmedSlot = .{}; + return try .init(.{ .allocator = allocator, .logger = .FOR_TESTS, @@ -1333,6 +1341,8 @@ pub const DependencyStubs = struct { .replay_threads = num_threads, .stop_at_slot = null, + .latest_processed_slot = &latest_processed_slot, + .latest_confirmed_slot = &latest_confirmed_slot, }, .enabled); } }; diff --git a/src/replay/trackers.zig b/src/replay/trackers.zig index 31539a5db3..d5e3cda2f1 100644 --- a/src/replay/trackers.zig +++ b/src/replay/trackers.zig @@ -7,6 +7,7 @@ const Allocator = std.mem.Allocator; const Slot = sig.core.Slot; const SlotConstants = sig.core.SlotConstants; const SlotState = sig.core.SlotState; +const Commitment = sig.rpc.methods.common.Commitment; pub const ForkChoiceProcessedSlot = struct { slot: std.atomic.Value(Slot) = .init(0), @@ -119,6 +120,14 @@ pub const SlotTracker = struct { return elem.toRef(); } + pub fn getSlotForCommitment(self: *const SlotTracker, commitment: Commitment) Slot { + return switch (commitment) { + .processed => self.latest_processed_slot.get(), + .confirmed => self.latest_confirmed_slot.get(), + .finalized => self.root.load(.monotonic), + }; + } + pub const GetOrPutResult = struct { found_existing: bool, reference: Reference, @@ -489,10 +498,18 @@ fn testDummySlotConstants(slot: Slot) SlotConstants { test "SlotTracker.prune removes all slots less than root" { const allocator = std.testing.allocator; const root_slot: Slot = 4; - var tracker: SlotTracker = try .init(allocator, root_slot, .{ - .constants = testDummySlotConstants(root_slot), - .state = .GENESIS, - }); + var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + var tracker: SlotTracker = try .init( + allocator, + &processed_slot, + &confirmed_slot, + root_slot, + .{ + .constants = testDummySlotConstants(root_slot), + .state = .GENESIS, + }, + ); defer tracker.deinit(allocator); // Add slots 1, 2, 3, 4, 5 @@ -513,6 +530,9 @@ test "SlotTracker.prune removes all slots less than root" { try std.testing.expect(!tracker.contains(1)); try std.testing.expect(!tracker.contains(2)); try std.testing.expect(!tracker.contains(3)); + + try std.testing.expectEqual(0, processed_slot.get()); + try std.testing.expectEqual(0, confirmed_slot.get()); } test "SlotTree: if no forks, root follows 32 behind latest" { diff --git a/src/rpc/methods.zig b/src/rpc/methods.zig index c03cf14971..a36da519b9 100644 --- a/src/rpc/methods.zig +++ b/src/rpc/methods.zig @@ -690,54 +690,22 @@ pub const common = struct { pub const SlotHookContext = struct { slot_tracker: *const sig.replay.trackers.SlotTracker, - fn getLatestProcessedSlot(self: @This()) !Slot { - const slot = self.slot_tracker.latest_processed_slot.get(); - if (slot == 0) { - return error.RpcNoProcessedSlot; - } - return slot; - } - - fn getLatestConfirmedSlot(self: @This()) !Slot { - const slot = self.slot_tracker.latest_confirmed_slot.get(); - if (slot == 0) { - return error.RpcNoConfirmedSlot; - } - return slot; - } - - fn getLatestFinalizedSlot(self: @This()) !Slot { - const slot = self.slot_tracker.root.load(.monotonic); - if (slot == 0) { - return error.RpcNoFinalizedSlot; - } - return slot; - } - fn getSlotImpl( self: @This(), config: common.CommitmentSlotConfig, ) !Slot { const commitment = config.commitment orelse .finalized; - - const slot = switch (commitment) { - .processed => try self.getLatestProcessedSlot(), - .confirmed => try self.getLatestConfirmedSlot(), - .finalized => try self.getLatestFinalizedSlot(), - }; - + const slot = self.slot_tracker.getSlotForCommitment(commitment); if (config.minContextSlot) |min_slot| { if (slot < min_slot) { return error.RpcMinContextSlotNotMet; } } - return slot; } pub fn getSlot(self: @This(), _: std.mem.Allocator, params: GetSlot) !GetSlot.Response { const config = params.config orelse common.CommitmentSlotConfig{}; - return self.getSlotImpl(config); } }; From 13fa71898dca61b364edc79503cd310f2bca130d Mon Sep 17 00:00:00 2001 From: prestonsn Date: Tue, 10 Feb 2026 18:48:53 +0000 Subject: [PATCH 04/10] fix(replay): move processed and confirmed counters into SlotTracker to avoid uafs --- src/cmd.zig | 14 --- src/consensus/replay_tower.zig | 5 - src/consensus/vote_listener.zig | 43 ++------- src/replay/consensus/cluster_sync.zig | 5 - src/replay/consensus/core.zig | 121 ++++++++---------------- src/replay/consensus/process_result.zig | 10 +- src/replay/service.zig | 15 +-- src/replay/trackers.zig | 19 ++-- 8 files changed, 64 insertions(+), 168 deletions(-) diff --git a/src/cmd.zig b/src/cmd.zig index 8944dd59d0..a53ec269ea 100644 --- a/src/cmd.zig +++ b/src/cmd.zig @@ -1413,9 +1413,6 @@ fn validator( else null; - var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var replay_service_state: ReplayAndConsensusServiceState = try .init(allocator, .{ .app_base = &app_base, .account_store = .{ .accounts_db_two = &new_db }, @@ -1427,8 +1424,6 @@ fn validator( .voting_enabled = voting_enabled, .vote_account_address = maybe_vote_pubkey, .stop_at_slot = cfg.stop_at_slot, - .latest_processed_slot = &latest_processed_slot, - .latest_confirmed_slot = &latest_confirmed_slot, }); defer replay_service_state.deinit(allocator); @@ -1599,9 +1594,6 @@ fn replayOffline( ); defer epoch_tracker.deinit(allocator); - var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var replay_service_state: ReplayAndConsensusServiceState = try .init(allocator, .{ .app_base = &app_base, .account_store = .{ .accounts_db_two = &new_db }, @@ -1613,8 +1605,6 @@ fn replayOffline( .voting_enabled = false, .vote_account_address = null, .stop_at_slot = cfg.stop_at_slot, - .latest_processed_slot = &latest_processed_slot, - .latest_confirmed_slot = &latest_confirmed_slot, }); defer replay_service_state.deinit(allocator); @@ -2321,8 +2311,6 @@ const ReplayAndConsensusServiceState = struct { voting_enabled: bool, vote_account_address: ?Pubkey, stop_at_slot: ?Slot, - latest_processed_slot: *sig.replay.trackers.ForkChoiceProcessedSlot, - latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, }, ) !ReplayAndConsensusServiceState { var replay_state: replay.service.ReplayState = replay_state: { @@ -2379,8 +2367,6 @@ const ReplayAndConsensusServiceState = struct { .hard_forks = hard_forks, .replay_threads = params.replay_threads, .stop_at_slot = params.stop_at_slot, - .latest_processed_slot = params.latest_processed_slot, - .latest_confirmed_slot = params.latest_confirmed_slot, }, if (params.disable_consensus) .disabled else .enabled); }; errdefer replay_state.deinit(); diff --git a/src/consensus/replay_tower.zig b/src/consensus/replay_tower.zig index c53454d9ac..f446411d24 100644 --- a/src/consensus/replay_tower.zig +++ b/src/consensus/replay_tower.zig @@ -5448,13 +5448,8 @@ pub const TestFixture = struct { errdefer state.deinit(allocator); state.hash = .init(root.hash); - var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - break :blk try .init( allocator, - &latest_processed_slot, - &latest_confirmed_slot, root.slot, .{ .constants = constants, .state = state }, ); diff --git a/src/consensus/vote_listener.zig b/src/consensus/vote_listener.zig index 4feabcc3e3..cd30105b89 100644 --- a/src/consensus/vote_listener.zig +++ b/src/consensus/vote_listener.zig @@ -24,7 +24,7 @@ const EpochTracker = sig.core.EpochTracker; const Logger = sig.trace.Logger("vote_listener"); pub const SlotDataProvider = struct { - slot_tracker: *const SlotTracker, + slot_tracker: *SlotTracker, epoch_tracker: *const EpochTracker, pub fn rootSlot(self: *const SlotDataProvider) Slot { @@ -317,7 +317,6 @@ pub const VoteCollector = struct { latest_vote_slot_per_validator: sig.utils.collections.PubkeyMap(Slot), last_process_root: sig.time.Instant, vote_processing_time: VoteProcessingTiming, - latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, metrics: VoteListenerMetrics, pub fn deinit(self: *VoteCollector, allocator: std.mem.Allocator) void { @@ -332,7 +331,6 @@ pub const VoteCollector = struct { now: sig.time.Instant, root_slot: Slot, registry: *sig.prometheus.Registry(.{}), - latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, ) !VoteCollector { return .{ .gossip_vote_receptor = .INIT, @@ -341,7 +339,6 @@ pub const VoteCollector = struct { .latest_vote_slot_per_validator = .empty, .last_process_root = now, .vote_processing_time = .ZEROES, - .latest_confirmed_slot = latest_confirmed_slot, .metrics = try .init(registry), }; } @@ -400,7 +397,6 @@ pub const VoteCollector = struct { gossip_vote_txs, &self.vote_processing_time, &self.latest_vote_slot_per_validator, - self.latest_confirmed_slot, self.metrics, ); defer allocator.free(confirmed_slots); @@ -424,7 +420,6 @@ fn listenAndConfirmVotes( gossip_vote_txs: []const vote_parser.ParsedVote, vote_processing_time: ?*VoteProcessingTiming, latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot), - latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, metrics: VoteListenerMetrics, ) std.mem.Allocator.Error![]const ThresholdConfirmedSlot { var replay_votes_buffer: std.ArrayListUnmanaged(vote_parser.ParsedVote) = .empty; @@ -459,7 +454,6 @@ fn listenAndConfirmVotes( latest_vote_slot_per_validator, gossip_vote_txs, replay_votes, - latest_confirmed_slot, metrics, ); } @@ -474,7 +468,6 @@ fn filterAndConfirmWithNewVotes( latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot), gossip_vote_txs: []const vote_parser.ParsedVote, replayed_votes: []const vote_parser.ParsedVote, - latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, metrics: VoteListenerMetrics, ) std.mem.Allocator.Error![]const ThresholdConfirmedSlot { const root_slot = slot_data_provider.rootSlot(); @@ -510,7 +503,6 @@ fn filterAndConfirmWithNewVotes( &new_optimistic_confirmed_slots, is_gossip, latest_vote_slot_per_validator, - latest_confirmed_slot, ); if (is_gossip) metrics.gossip_votes_processed.inc() @@ -775,7 +767,6 @@ fn trackNewVotesAndNotifyConfirmations( new_optimistic_confirmed_slots: *std.ArrayListUnmanaged(ThresholdConfirmedSlot), is_gossip_vote: bool, latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot), - latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot, ) std.mem.Allocator.Error!void { if (vote.isEmpty()) return; const root = slot_data_provider.rootSlot(); @@ -876,7 +867,7 @@ fn trackNewVotesAndNotifyConfirmations( .slot = slot, .hash = hash, }); - latest_confirmed_slot.update(slot); + slot_data_provider.slot_tracker.latest_confirmed_slot.update(slot); // Notify subscribers about new optimistic confirmation if (senders.bank_notification) |sender| { sender.send(.{ .optimistically_confirmed = slot }) catch |err| { @@ -969,13 +960,8 @@ test "trackNewVotesAndNotifyConfirmations filter" { var prng_state: std.Random.DefaultPrng = .init(std.testing.random_seed); const prng = prng_state.random(); - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker: SlotTracker = try .init( allocator, - &processed_slot, - &confirmed_slot, 0, try slotTrackerElementGenesis(allocator, .DEFAULT), ); @@ -1052,7 +1038,6 @@ test "trackNewVotesAndNotifyConfirmations filter" { &new_optimistic_confirmed_slots, is_gossip_vote, &latest_vote_slot_per_validator, - &confirmed_slot, ); } diff.sortAsc(); @@ -1102,14 +1087,13 @@ test "trackNewVotesAndNotifyConfirmations filter" { &new_optimistic_confirmed_slots, is_gossip_vote, &latest_vote_slot_per_validator, - &confirmed_slot, ); } diff.sortAsc(); try std.testing.expectEqualSlices(Slot, diff.map.keys(), &.{ 7, 8 }); // No stake delegated, so optimistic confirmation should not be reached. - try std.testing.expectEqual(0, confirmed_slot.get()); + try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.confirmed)); } const ThresholdReachedResults = std.bit_set.IntegerBitSet(THRESHOLDS_TO_CHECK.len); @@ -1752,10 +1736,7 @@ test "simple usage" { var registry: sig.prometheus.Registry(.{}) = .init(allocator); defer registry.deinit(); - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - - var slot_tracker: SlotTracker = try .init(allocator, &processed_slot, &confirmed_slot, 0, .{ + var slot_tracker: SlotTracker = try .init(allocator, 0, .{ .constants = .{ .parent_slot = 0, .parent_hash = .ZEROES, @@ -1803,7 +1784,6 @@ test "simple usage" { .EPOCH_ZERO, slot_data_provider.rootSlot(), ®istry, - &confirmed_slot, ); defer vote_collector.deinit(allocator); @@ -1817,8 +1797,8 @@ test "simple usage" { // Since no votes were sent, slot trackers should remain at their initialized state. // NOTE: processed slot is not used here, but required to construct SlotTracker. - try std.testing.expectEqual(0, processed_slot.get()); - try std.testing.expectEqual(0, confirmed_slot.get()); + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.processed)); + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } test "check trackers" { @@ -1842,14 +1822,11 @@ test "check trackers" { const root_slot: Slot = 0; - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker: SlotTracker = blk: { var state: sig.core.SlotState = .GENESIS; errdefer state.deinit(allocator); - break :blk try .init(allocator, &processed_slot, &confirmed_slot, root_slot, .{ + break :blk try .init(allocator, root_slot, .{ .constants = .{ .parent_slot = root_slot -| 1, .parent_hash = .ZEROES, @@ -1913,7 +1890,7 @@ test "check trackers" { defer replay_votes_channel.destroy(); var vote_collector: VoteCollector = - try .init(.EPOCH_ZERO, slot_data_provider.rootSlot(), ®istry, &confirmed_slot); + try .init(.EPOCH_ZERO, slot_data_provider.rootSlot(), ®istry); defer vote_collector.deinit(allocator); var expected_trackers: std.ArrayListUnmanaged(struct { Slot, TestSlotVoteTracker }) = .empty; @@ -2032,8 +2009,8 @@ test "check trackers" { // Votes were processed but no stake was delegated to validators, so // optimisitic confirmation was not reached. - try std.testing.expectEqual(0, confirmed_slot.get()); - try std.testing.expectEqual(0, confirmed_slot.get()); + try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.processed)); + try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.confirmed)); } // tests for OptimisticConfirmationVerifier moved to optimistic_vote_verifier.zig diff --git a/src/replay/consensus/cluster_sync.zig b/src/replay/consensus/cluster_sync.zig index 22bfd880e7..ab70121b30 100644 --- a/src/replay/consensus/cluster_sync.zig +++ b/src/replay/consensus/cluster_sync.zig @@ -1484,13 +1484,8 @@ const TestData = struct { }), }; - var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker: SlotTracker = try .init( allocator, - &latest_processed_slot, - &latest_confirmed_slot, root_slot, try slot_infos[root_slot].toDummyElem(&slot_infos, random), ); diff --git a/src/replay/consensus/core.zig b/src/replay/consensus/core.zig index e1ed47ecf0..c40e2d7305 100644 --- a/src/replay/consensus/core.zig +++ b/src/replay/consensus/core.zig @@ -136,7 +136,7 @@ pub const TowerConsensus = struct { signing: sig.identity.SigningKeys, account_reader: AccountReader, ledger: *sig.ledger.Ledger, - slot_tracker: *const SlotTracker, + slot_tracker: *SlotTracker, /// Usually `.now()`. now: sig.time.Instant, registry: *sig.prometheus.Registry(.{}), @@ -177,7 +177,7 @@ pub const TowerConsensus = struct { errdefer replay_tower.deinit(allocator); var vote_collector: sig.consensus.VoteCollector = - try .init(deps.now, root, deps.registry, deps.slot_tracker.latest_confirmed_slot); + try .init(deps.now, root, deps.registry); errdefer vote_collector.deinit(allocator); return .{ @@ -2509,8 +2509,8 @@ test "checkAndHandleNewRoot - missing slot" { .hash = Hash.initRandom(random), }; - var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + const latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + const latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; const root: std.atomic.Value(Slot) = .init(0); // NOTE: TestFixture has its own SlotTracker as well. Unclear if that matters. @@ -2523,8 +2523,8 @@ test "checkAndHandleNewRoot - missing slot" { // Build a tracked slot set wrapped in RwMux var slot_tracker: SlotTracker = .{ .root = root, - .latest_processed_slot = &latest_processed_slot, - .latest_confirmed_slot = &latest_confirmed_slot, + .latest_processed_slot = latest_processed_slot, + .latest_confirmed_slot = latest_confirmed_slot, .slots = .empty, }; defer slot_tracker.deinit(testing.allocator); @@ -2568,8 +2568,8 @@ test "checkAndHandleNewRoot - missing slot" { try testing.expectError(error.MissingSlot, result); // Verify slot trackers remain at initial state after failure - try testing.expectEqual(0, slot_tracker.latest_processed_slot.get()); - try testing.expectEqual(0, slot_tracker.latest_confirmed_slot.get()); + try testing.expectEqual(0, slot_tracker.getSlotForCommitment(.processed)); + try testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } test "checkAndHandleNewRoot - missing hash" { @@ -2585,14 +2585,14 @@ test "checkAndHandleNewRoot - missing hash" { var fixture = try TestFixture.init(allocator, root); defer fixture.deinit(allocator); - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + const processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + const confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; var slot_tracker2: SlotTracker = .{ .root = .init(root.slot), .slots = .empty, - .latest_processed_slot = &processed_slot, - .latest_confirmed_slot = &confirmed_slot, + .latest_processed_slot = processed_slot, + .latest_confirmed_slot = confirmed_slot, }; defer slot_tracker2.deinit(allocator); @@ -2639,8 +2639,8 @@ test "checkAndHandleNewRoot - missing hash" { try testing.expectError(error.MissingHash, result); // Verify slot trackers remain at initial state after failure - try testing.expectEqual(0, processed_slot.get()); - try testing.expectEqual(0, confirmed_slot.get()); + try testing.expectEqual(0, slot_tracker2.getSlotForCommitment(.processed)); + try testing.expectEqual(0, slot_tracker2.getSlotForCommitment(.confirmed)); } test "checkAndHandleNewRoot - empty slot tracker" { @@ -2657,14 +2657,14 @@ test "checkAndHandleNewRoot - empty slot tracker" { var fixture = try TestFixture.init(testing.allocator, root); defer fixture.deinit(testing.allocator); - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + const processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + const confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; var slot_tracker3: SlotTracker = .{ .root = .init(root.slot), .slots = .empty, - .latest_processed_slot = &processed_slot, - .latest_confirmed_slot = &confirmed_slot, + .latest_processed_slot = processed_slot, + .latest_confirmed_slot = confirmed_slot, }; defer slot_tracker3.deinit(testing.allocator); @@ -2698,8 +2698,8 @@ test "checkAndHandleNewRoot - empty slot tracker" { try testing.expectError(error.EmptySlotTracker, result); // Verify slot trackers remain at initial state after failure - try testing.expectEqual(0, processed_slot.get()); - try testing.expectEqual(0, confirmed_slot.get()); + try testing.expectEqual(0, slot_tracker3.getSlotForCommitment(.processed)); + try testing.expectEqual(0, slot_tracker3.getSlotForCommitment(.confirmed)); } test "checkAndHandleNewRoot - success" { @@ -2727,14 +2727,14 @@ test "checkAndHandleNewRoot - success" { var fixture = try TestFixture.init(allocator, root); defer fixture.deinit(allocator); - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + const processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + const confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; var slot_tracker4: SlotTracker = .{ .root = .init(root.slot), .slots = .empty, - .latest_processed_slot = &processed_slot, - .latest_confirmed_slot = &confirmed_slot, + .latest_processed_slot = processed_slot, + .latest_confirmed_slot = confirmed_slot, }; defer slot_tracker4.deinit(allocator); @@ -5095,10 +5095,8 @@ test "edge cases - gossip verified vote hashes" { const root_slot0 = slot_tracker.root.load(.monotonic); std.debug.assert(root_slot0 == 0); // assert initial root value - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var vote_collector: sig.consensus.vote_listener.VoteCollector = - try .init(.EPOCH_ZERO, root_slot0, ®istry, &confirmed_slot); + try .init(.EPOCH_ZERO, root_slot0, ®istry); defer vote_collector.deinit(gpa); const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; @@ -5251,7 +5249,7 @@ test "edge cases - gossip verified vote hashes" { ); // Optimisic confirmation threshold not reached during this test. - try std.testing.expectEqual(0, confirmed_slot.get()); + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // TODO: Re-implement tests for the new consolidated API @@ -5312,13 +5310,8 @@ test "vote on heaviest frozen descendant with no switch" { try bhq.mut().insertGenesisHash(allocator, root_state.hash.readCopy().?, 0); } - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker = try SlotTracker.init( allocator, - &processed_slot, - &confirmed_slot, root_slot, .{ .constants = root_consts, @@ -5457,10 +5450,10 @@ test "vote on heaviest frozen descendant with no switch" { try std.testing.expectEqual(slot1_hash, consensus.fork_choice.heaviestOverallSlot().hash); // 6. Assert trackers - // processed_slot is updated to slot_1 when the tower votes on it - try std.testing.expectEqual(slot_1, processed_slot.get()); - // confirmed_slot remains at 0 (no optimistic confirmation votes processed) - try std.testing.expectEqual(0, confirmed_slot.get()); + // processed slot is updated to slot_1 when the tower votes on it + try std.testing.expectEqual(slot_1, slot_tracker.getSlotForCommitment(.processed)); + // confirmed slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // State setup @@ -5497,13 +5490,8 @@ test "vote accounts with landed votes populate bank stats" { try bhq.mut().insertGenesisHash(allocator, root_state.hash.readCopy().?, 0); } - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker = try SlotTracker.init( allocator, - &processed_slot, - &confirmed_slot, root_slot, .{ .constants = root_consts, @@ -5688,10 +5676,10 @@ test "vote accounts with landed votes populate bank stats" { try std.testing.expectEqual(slot_1, consensus.fork_choice.heaviestOverallSlot().slot); // Assert trackers - // processed_slot is updated to slot_1 when the tower votes on it - try std.testing.expectEqual(slot_1, processed_slot.get()); - // confirmed_slot remains at 0 (no optimistic confirmation votes processed) - try std.testing.expectEqual(0, confirmed_slot.get()); + // processed slot is updated to slot_1 when the tower votes on it + try std.testing.expectEqual(slot_1, slot_tracker.getSlotForCommitment(.processed)); + // confirmed slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // Test case: @@ -5796,13 +5784,8 @@ test "root advances after vote satisfies lockouts" { hashes[i] = Hash{ .data = .{@as(u8, @intCast(i % 256))} ** Hash.SIZE }; } - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker: SlotTracker = try .init( allocator, - &processed_slot, - &confirmed_slot, initial_root, .{ .constants = root_consts, .state = root_state }, ); @@ -6149,10 +6132,10 @@ test "root advances after vote satisfies lockouts" { } // Assert trackers - // processed_slot is updated to slot 33 when the tower votes on it - try std.testing.expectEqual(33, processed_slot.get()); - // confirmed_slot remains at 0 (no optimistic confirmation votes processed) - try std.testing.expectEqual(0, confirmed_slot.get()); + // processed slot is updated to slot 33 when the tower votes on it + try std.testing.expectEqual(33, slot_tracker.getSlotForCommitment(.processed)); + // confirmed slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // Test case: @@ -6214,13 +6197,8 @@ test "vote refresh when no new vote available" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker = try SlotTracker.init( allocator, - &processed_slot, - &confirmed_slot, root_slot, .{ .constants = root_consts, .state = root_state }, ); @@ -6350,10 +6328,10 @@ test "vote refresh when no new vote available" { try std.testing.expectEqual(1, consensus.fork_choice.heaviestOverallSlot().slot); // Assert trackers - // processed_slot is updated to slot 1 when the tower votes on it - try std.testing.expectEqual(1, processed_slot.get()); - // confirmed_slot remains at 0 (no optimistic confirmation votes processed) - try std.testing.expectEqual(0, confirmed_slot.get()); + // processed slot is updated to slot 1 when the tower votes on it + try std.testing.expectEqual(1, slot_tracker.getSlotForCommitment(.processed)); + // confirmed slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // Test case: @@ -6419,13 +6397,8 @@ test "detect and mark duplicate confirmed fork" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker = try SlotTracker.init( allocator, - &processed_slot, - &confirmed_slot, root_slot, .{ .constants = root_consts, .state = root_state }, ); @@ -6690,13 +6663,8 @@ test "detect and mark duplicate slot" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker = try SlotTracker.init( allocator, - &processed_slot, - &confirmed_slot, root_slot, .{ .constants = root_consts, .state = root_state }, ); @@ -6895,13 +6863,8 @@ test "successful fork switch (switch_proof)" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - var slot_tracker = try SlotTracker.init( allocator, - &processed_slot, - &confirmed_slot, root_slot, .{ .constants = root_consts, .state = root_state }, ); diff --git a/src/replay/consensus/process_result.zig b/src/replay/consensus/process_result.zig index 67dbd8d732..475e509792 100644 --- a/src/replay/consensus/process_result.zig +++ b/src/replay/consensus/process_result.zig @@ -305,15 +305,11 @@ const TestReplayStateResources = struct { self.duplicate_slots_to_repair = DuplicateSlotsToRepair.empty; self.purge_repair_slot_counter = PurgeRepairSlotCounters.empty; - var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - const root: std.atomic.Value(Slot) = .init(0); - self.slot_tracker = SlotTracker{ .slots = .empty, - .latest_processed_slot = &latest_processed_slot, - .latest_confirmed_slot = &latest_confirmed_slot, - .root = root, + .latest_processed_slot = .{}, + .latest_confirmed_slot = .{}, + .root = .init(0), }; self.ancestor_hashes_replay_update_channel = try sig diff --git a/src/replay/service.zig b/src/replay/service.zig index 504287eb22..cc18752568 100644 --- a/src/replay/service.zig +++ b/src/replay/service.zig @@ -162,8 +162,6 @@ pub const Dependencies = struct { hard_forks: sig.core.HardForks, replay_threads: u32, stop_at_slot: ?Slot, - latest_processed_slot: *replay.trackers.ForkChoiceProcessedSlot, - latest_confirmed_slot: *replay.trackers.OptimisticallyConfirmedSlot, }; pub const ConsensusStatus = enum { @@ -212,8 +210,6 @@ pub const ReplayState = struct { var slot_tracker: SlotTracker = try .init( deps.allocator, - deps.latest_processed_slot, - deps.latest_confirmed_slot, deps.root.slot, .{ .constants = deps.root.constants, .state = deps.root.state }, ); @@ -813,8 +809,8 @@ test trackNewSlots { &.{ 3, 5 }, ); - try std.testing.expectEqual(0, processed_slot.get()); - try std.testing.expectEqual(0, confirmed_slot.get()); + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.processed)); + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } fn expectSlotTracker( @@ -1264,8 +1260,6 @@ pub const DependencyStubs = struct { .replay_threads = 1, .stop_at_slot = null, - .latest_processed_slot = &latest_processed_slot, - .latest_confirmed_slot = &latest_confirmed_slot, }, .enabled); } @@ -1315,9 +1309,6 @@ pub const DependencyStubs = struct { const hard_forks = try bank_fields.hard_forks.clone(allocator); errdefer hard_forks.deinit(allocator); - var latest_processed_slot: replay.trackers.ForkChoiceProcessedSlot = .{}; - var latest_confirmed_slot: replay.trackers.OptimisticallyConfirmedSlot = .{}; - return try .init(.{ .allocator = allocator, .logger = .FOR_TESTS, @@ -1341,8 +1332,6 @@ pub const DependencyStubs = struct { .replay_threads = num_threads, .stop_at_slot = null, - .latest_processed_slot = &latest_processed_slot, - .latest_confirmed_slot = &latest_confirmed_slot, }, .enabled); } }; diff --git a/src/replay/trackers.zig b/src/replay/trackers.zig index d5e3cda2f1..1a0fd22d98 100644 --- a/src/replay/trackers.zig +++ b/src/replay/trackers.zig @@ -48,8 +48,8 @@ pub const OptimisticallyConfirmedSlot = struct { /// will end as soon as the items are removed. pub const SlotTracker = struct { slots: std.AutoArrayHashMapUnmanaged(Slot, *Element), - latest_processed_slot: *ForkChoiceProcessedSlot, - latest_confirmed_slot: *OptimisticallyConfirmedSlot, + latest_processed_slot: ForkChoiceProcessedSlot, + latest_confirmed_slot: OptimisticallyConfirmedSlot, root: std.atomic.Value(Slot), pub const Element = struct { @@ -71,8 +71,6 @@ pub const SlotTracker = struct { pub fn init( allocator: std.mem.Allocator, - latest_processed_slot: *ForkChoiceProcessedSlot, - latest_confirmed_slot: *OptimisticallyConfirmedSlot, root_slot: Slot, /// ownership is transferred to this function, except in the case of an error return slot_init: Element, @@ -80,8 +78,8 @@ pub const SlotTracker = struct { var self: SlotTracker = .{ .root = .init(root_slot), .slots = .empty, - .latest_processed_slot = latest_processed_slot, - .latest_confirmed_slot = latest_confirmed_slot, + .latest_processed_slot = .{}, + .latest_confirmed_slot = .{}, }; errdefer self.deinit(allocator); @@ -498,12 +496,8 @@ fn testDummySlotConstants(slot: Slot) SlotConstants { test "SlotTracker.prune removes all slots less than root" { const allocator = std.testing.allocator; const root_slot: Slot = 4; - var processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - var confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; var tracker: SlotTracker = try .init( allocator, - &processed_slot, - &confirmed_slot, root_slot, .{ .constants = testDummySlotConstants(root_slot), @@ -531,8 +525,9 @@ test "SlotTracker.prune removes all slots less than root" { try std.testing.expect(!tracker.contains(2)); try std.testing.expect(!tracker.contains(3)); - try std.testing.expectEqual(0, processed_slot.get()); - try std.testing.expectEqual(0, confirmed_slot.get()); + try std.testing.expectEqual(0, tracker.getSlotForCommitment(Commitment.processed)); + try std.testing.expectEqual(0, tracker.getSlotForCommitment(Commitment.confirmed)); + try std.testing.expectEqual(4, tracker.getSlotForCommitment(Commitment.finalized)); } test "SlotTree: if no forks, root follows 32 behind latest" { From ed89574f6e0485ee4836b1cdfdc5e57a3883ab14 Mon Sep 17 00:00:00 2001 From: prestonsn Date: Tue, 10 Feb 2026 19:35:24 +0000 Subject: [PATCH 05/10] fix(core): remove redundant asert from edge cases unit tests --- src/replay/consensus/core.zig | 1 - 1 file changed, 1 deletion(-) diff --git a/src/replay/consensus/core.zig b/src/replay/consensus/core.zig index c40e2d7305..ad8ccd57e8 100644 --- a/src/replay/consensus/core.zig +++ b/src/replay/consensus/core.zig @@ -4926,7 +4926,6 @@ test "edge cases - duplicate confirmed slot" { const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash - std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash // -- slot1 -- // const slot1: Slot = 1; From a6e49ca5d4d228d6271c801f4006803bd2e19e80 Mon Sep 17 00:00:00 2001 From: prestonsn Date: Tue, 10 Feb 2026 19:38:44 +0000 Subject: [PATCH 06/10] fix(methods): inline getSlotImpl into getSlot RPC callback --- src/rpc/methods.zig | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/rpc/methods.zig b/src/rpc/methods.zig index a36da519b9..75bbbe631f 100644 --- a/src/rpc/methods.zig +++ b/src/rpc/methods.zig @@ -690,10 +690,8 @@ pub const common = struct { pub const SlotHookContext = struct { slot_tracker: *const sig.replay.trackers.SlotTracker, - fn getSlotImpl( - self: @This(), - config: common.CommitmentSlotConfig, - ) !Slot { + pub fn getSlot(self: @This(), _: std.mem.Allocator, params: GetSlot) !GetSlot.Response { + const config = params.config orelse common.CommitmentSlotConfig{}; const commitment = config.commitment orelse .finalized; const slot = self.slot_tracker.getSlotForCommitment(commitment); if (config.minContextSlot) |min_slot| { @@ -703,9 +701,4 @@ pub const SlotHookContext = struct { } return slot; } - - pub fn getSlot(self: @This(), _: std.mem.Allocator, params: GetSlot) !GetSlot.Response { - const config = params.config orelse common.CommitmentSlotConfig{}; - return self.getSlotImpl(config); - } }; From 341b328831a2a05e24f1ff8a840ee783eec238a2 Mon Sep 17 00:00:00 2001 From: prestonsn Date: Tue, 10 Feb 2026 19:55:05 +0000 Subject: [PATCH 07/10] fix(trackers, methods): remove use of @This() in accordance with style guide --- src/replay/trackers.zig | 8 ++++---- src/rpc/methods.zig | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/replay/trackers.zig b/src/replay/trackers.zig index 1a0fd22d98..b5282c7000 100644 --- a/src/replay/trackers.zig +++ b/src/replay/trackers.zig @@ -15,11 +15,11 @@ pub const ForkChoiceProcessedSlot = struct { /// Set the current processed slot (heaviest fork tip). /// Uses store() because this can decrease when the fork choice /// switches to a different fork with a lower slot. - pub fn set(self: *@This(), new_slot: Slot) void { + pub fn set(self: *ForkChoiceProcessedSlot, new_slot: Slot) void { self.slot.store(new_slot, .monotonic); } - pub fn get(self: *const @This()) Slot { + pub fn get(self: *const ForkChoiceProcessedSlot) Slot { return self.slot.load(.monotonic); } }; @@ -27,11 +27,11 @@ pub const ForkChoiceProcessedSlot = struct { pub const OptimisticallyConfirmedSlot = struct { slot: std.atomic.Value(Slot) = .init(0), - pub fn update(self: *@This(), new_slot: Slot) void { + pub fn update(self: *OptimisticallyConfirmedSlot, new_slot: Slot) void { _ = self.slot.fetchMax(new_slot, .monotonic); } - pub fn get(self: *const @This()) Slot { + pub fn get(self: *const OptimisticallyConfirmedSlot) Slot { return self.slot.load(.monotonic); } }; diff --git a/src/rpc/methods.zig b/src/rpc/methods.zig index 75bbbe631f..bae8f9b529 100644 --- a/src/rpc/methods.zig +++ b/src/rpc/methods.zig @@ -690,7 +690,7 @@ pub const common = struct { pub const SlotHookContext = struct { slot_tracker: *const sig.replay.trackers.SlotTracker, - pub fn getSlot(self: @This(), _: std.mem.Allocator, params: GetSlot) !GetSlot.Response { + pub fn getSlot(self: SlotHookContext, _: std.mem.Allocator, params: GetSlot) !GetSlot.Response { const config = params.config orelse common.CommitmentSlotConfig{}; const commitment = config.commitment orelse .finalized; const slot = self.slot_tracker.getSlotForCommitment(commitment); From 2c1518f130565130c1b0759f54da3ec81edba1ce Mon Sep 17 00:00:00 2001 From: prestonsn Date: Tue, 10 Feb 2026 23:24:31 +0000 Subject: [PATCH 08/10] refactor(rpc): simplify minContextSlot check in getSlot Condense nested if statements into single-line conditional return. --- src/rpc/methods.zig | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/rpc/methods.zig b/src/rpc/methods.zig index bae8f9b529..06ce4470b3 100644 --- a/src/rpc/methods.zig +++ b/src/rpc/methods.zig @@ -694,11 +694,7 @@ pub const SlotHookContext = struct { const config = params.config orelse common.CommitmentSlotConfig{}; const commitment = config.commitment orelse .finalized; const slot = self.slot_tracker.getSlotForCommitment(commitment); - if (config.minContextSlot) |min_slot| { - if (slot < min_slot) { - return error.RpcMinContextSlotNotMet; - } - } - return slot; + const min_slot = config.minContextSlot orelse return slot; + return if (slot >= min_slot) slot else error.RpcMinContextSlotNotMet; } }; From 0568f9e6ef8a6f550b6839029f437565004f80a7 Mon Sep 17 00:00:00 2001 From: prestonsn Date: Wed, 11 Feb 2026 16:22:18 +0000 Subject: [PATCH 09/10] fix(tests): wrap SlotTracker in RwMux for thread-safe access in checkAndHandleNewRoot test --- src/replay/consensus/core.zig | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/replay/consensus/core.zig b/src/replay/consensus/core.zig index ad8ccd57e8..db4ab0af2c 100644 --- a/src/replay/consensus/core.zig +++ b/src/replay/consensus/core.zig @@ -2727,16 +2727,17 @@ test "checkAndHandleNewRoot - success" { var fixture = try TestFixture.init(allocator, root); defer fixture.deinit(allocator); - const processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; - const confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; - - var slot_tracker4: SlotTracker = .{ - .root = .init(root.slot), + var slot_tracker4 = RwMux(SlotTracker).init(.{ + .root = std.atomic.Value(Slot).init(root.slot), .slots = .empty, - .latest_processed_slot = processed_slot, - .latest_confirmed_slot = confirmed_slot, - }; - defer slot_tracker4.deinit(allocator); + .latest_processed_slot = .{}, + .latest_confirmed_slot = .{}, + }); + defer { + const ptr, var lg = slot_tracker4.writeWithLock(); + defer lg.unlock(); + ptr.deinit(allocator); + } { var constants2 = try SlotConstants.genesis(allocator, .initRandom(random)); @@ -2756,11 +2757,13 @@ test "checkAndHandleNewRoot - success" { state2.hash = .init(hash2.hash); state3.hash = .init(hash3.hash); - try slot_tracker4.put(allocator, hash2.slot, .{ + const ptr, var lg = slot_tracker4.writeWithLock(); + defer lg.unlock(); + try ptr.put(allocator, hash2.slot, .{ .constants = constants2, .state = state2, }); - try slot_tracker4.put(allocator, hash3.slot, .{ + try ptr.put(allocator, hash3.slot, .{ .constants = constants3, .state = state3, }); @@ -2813,8 +2816,13 @@ test "checkAndHandleNewRoot - success" { } try testing.expectEqual(1, fixture.progress.map.count()); - for (slot_tracker4.slots.keys()) |remaining_slots| { - try testing.expect(remaining_slots >= hash3.slot); + // Now the write lock is released, we can acquire a read lock + { + const ptr, var lg = slot_tracker4.readWithLock(); + defer lg.unlock(); + for (ptr.slots.keys()) |remaining_slots| { + try testing.expect(remaining_slots >= hash3.slot); + } } try testing.expect(!fixture.progress.map.contains(hash1.slot)); } From 25cb22e58ab39ce15c1ac09eadec90fed0864eab Mon Sep 17 00:00:00 2001 From: prestonsn Date: Fri, 6 Feb 2026 00:14:36 +0000 Subject: [PATCH 10/10] feat(rpc): implement getBalance RPC method - Add getBalance handler with commitment level support in AccountsDB - Add getSlotForCommitment to SlotTracker for commitment-based slot resolution - Update registerRPCHooks signature to accept slot_tracker and snapshot_slot - Use snapshot slot as fallback when no live slot tracker is available --- src/accountsdb/db.zig | 60 +++++++++++++++++++++++++++++++++++++-- src/cmd.zig | 3 +- src/rpc/server/server.zig | 4 +-- 3 files changed, 62 insertions(+), 5 deletions(-) diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index 35c712198b..a437be7b2d 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -8,6 +8,7 @@ const tracy = @import("tracy"); const sysvar = sig.runtime.sysvar; const snapgen = sig.accounts_db.snapshot.data.generate; +const methods = sig.rpc.methods; const Resolution = @import("../benchmarks.zig").Resolution; @@ -2873,9 +2874,64 @@ pub const AccountsDB = struct { return slotSatisfiesMax(slot, max_slot) and slotSatisfiesMin(slot, min_slot); } - pub fn registerRPCHooks(self: *AccountsDB, rpc_hooks: *sig.rpc.Hooks) !void { + pub fn registerRPCHooks( + self: *AccountsDB, + rpc_hooks: *sig.rpc.Hooks, + slot_tracker: ?*const sig.replay.trackers.SlotTracker, + snapshot_slot: Slot, + ) !void { try rpc_hooks.set(self.allocator, struct { accountsdb: *AccountsDB, + slot_tracker: ?*const sig.replay.trackers.SlotTracker, + snapshot_slot: Slot, + + pub fn getBalance( + this: @This(), + _: std.mem.Allocator, + params: methods.GetBalance, + ) !methods.GetBalance.Response { + const config = params.config orelse methods.common.CommitmentSlotConfig{}; + + const commitment = config.commitment orelse .finalized; + const commitment_slot: Slot = if (this.slot_tracker) |tracker| + tracker.getSlotForCommitment(commitment) + else + this.snapshot_slot; + const max_slot: ?Slot = if (commitment == .processed) null else commitment_slot; + + // Check minContextSlot constraint. + if (config.minContextSlot) |min_slot| { + if (commitment_slot < min_slot) { + return error.RpcMinContextSlotNotMet; + } + } + + // Look up account + const result = this.accountsdb.getSlotAndAccountInSlotRangeWithReadLock( + ¶ms.pubkey, + config.minContextSlot, + max_slot, + ) catch return error.AccountsDbError; + + const lamports: u64 = if (result) |r| blk: { + const account, _, var account_lock = r; + defer account_lock.unlock(); + + break :blk switch (account) { + .file => |aif| aif.account_info.lamports, + .unrooted_map => |um| um.lamports, + }; + // TODO: is defaulting to 0 the correct behavior here? + } else 0; + + return .{ + .context = .{ + .slot = commitment_slot, + .apiVersion = "2.0.15", + }, + .value = lamports, + }; + } pub fn getAccountInfo( this: @This(), @@ -3098,7 +3154,7 @@ pub const AccountsDB = struct { return null; } - }{ .accountsdb = self }); + }{ .accountsdb = self, .slot_tracker = slot_tracker, .snapshot_slot = snapshot_slot }); } }; diff --git a/src/cmd.zig b/src/cmd.zig index a53ec269ea..7f3cb196cd 100644 --- a/src/cmd.zig +++ b/src/cmd.zig @@ -2054,9 +2054,10 @@ fn mockRpcServer(allocator: std.mem.Allocator, cfg: config.Cmd) !void { defer manifest.deinit(allocator); } + const snapshot_slot = if (snap_files.incremental_info) |inc| inc.slot else snap_files.full.slot; var rpc_hooks = sig.rpc.Hooks{}; defer rpc_hooks.deinit(allocator); - try accountsdb.registerRPCHooks(&rpc_hooks); + try accountsdb.registerRPCHooks(&rpc_hooks, null, snapshot_slot); var server_ctx = try sig.rpc.server.Context.init(.{ .allocator = allocator, diff --git a/src/rpc/server/server.zig b/src/rpc/server/server.zig index ce0c32ed41..1b3149c50f 100644 --- a/src/rpc/server/server.zig +++ b/src/rpc/server/server.zig @@ -285,7 +285,7 @@ test "serveSpawn getSnapshot" { var rpc_hooks = sig.rpc.Hooks{}; defer rpc_hooks.deinit(accountsdb.allocator); - try accountsdb.registerRPCHooks(&rpc_hooks); + try accountsdb.registerRPCHooks(&rpc_hooks, null, 0); const sock_addr = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 0); var server_ctx = try Context.init(.{ @@ -440,7 +440,7 @@ test "serveSpawn getAccountInfo" { var rpc_hooks = sig.rpc.Hooks{}; defer rpc_hooks.deinit(allocator); - try accountsdb.registerRPCHooks(&rpc_hooks); + try accountsdb.registerRPCHooks(&rpc_hooks, null, expected_slot); const test_sock_addr = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 0); var server_ctx = try Context.init(.{