diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index 35c712198b..a437be7b2d 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -8,6 +8,7 @@ const tracy = @import("tracy"); const sysvar = sig.runtime.sysvar; const snapgen = sig.accounts_db.snapshot.data.generate; +const methods = sig.rpc.methods; const Resolution = @import("../benchmarks.zig").Resolution; @@ -2873,9 +2874,64 @@ pub const AccountsDB = struct { return slotSatisfiesMax(slot, max_slot) and slotSatisfiesMin(slot, min_slot); } - pub fn registerRPCHooks(self: *AccountsDB, rpc_hooks: *sig.rpc.Hooks) !void { + pub fn registerRPCHooks( + self: *AccountsDB, + rpc_hooks: *sig.rpc.Hooks, + slot_tracker: ?*const sig.replay.trackers.SlotTracker, + snapshot_slot: Slot, + ) !void { try rpc_hooks.set(self.allocator, struct { accountsdb: *AccountsDB, + slot_tracker: ?*const sig.replay.trackers.SlotTracker, + snapshot_slot: Slot, + + pub fn getBalance( + this: @This(), + _: std.mem.Allocator, + params: methods.GetBalance, + ) !methods.GetBalance.Response { + const config = params.config orelse methods.common.CommitmentSlotConfig{}; + + const commitment = config.commitment orelse .finalized; + const commitment_slot: Slot = if (this.slot_tracker) |tracker| + tracker.getSlotForCommitment(commitment) + else + this.snapshot_slot; + const max_slot: ?Slot = if (commitment == .processed) null else commitment_slot; + + // Check minContextSlot constraint. + if (config.minContextSlot) |min_slot| { + if (commitment_slot < min_slot) { + return error.RpcMinContextSlotNotMet; + } + } + + // Look up account + const result = this.accountsdb.getSlotAndAccountInSlotRangeWithReadLock( + ¶ms.pubkey, + config.minContextSlot, + max_slot, + ) catch return error.AccountsDbError; + + const lamports: u64 = if (result) |r| blk: { + const account, _, var account_lock = r; + defer account_lock.unlock(); + + break :blk switch (account) { + .file => |aif| aif.account_info.lamports, + .unrooted_map => |um| um.lamports, + }; + // TODO: is defaulting to 0 the correct behavior here? + } else 0; + + return .{ + .context = .{ + .slot = commitment_slot, + .apiVersion = "2.0.15", + }, + .value = lamports, + }; + } pub fn getAccountInfo( this: @This(), @@ -3098,7 +3154,7 @@ pub const AccountsDB = struct { return null; } - }{ .accountsdb = self }); + }{ .accountsdb = self, .slot_tracker = slot_tracker, .snapshot_slot = snapshot_slot }); } }; diff --git a/src/cmd.zig b/src/cmd.zig index 696de69435..7f3cb196cd 100644 --- a/src/cmd.zig +++ b/src/cmd.zig @@ -1427,6 +1427,10 @@ fn validator( }); defer replay_service_state.deinit(allocator); + try app_base.rpc_hooks.set(allocator, sig.rpc.methods.SlotHookContext{ + .slot_tracker = &replay_service_state.replay_state.slot_tracker, + }); + const replay_thread = try replay_service_state.spawnService( &app_base, if (maybe_vote_sockets) |*vs| vs else null, @@ -2050,9 +2054,10 @@ fn mockRpcServer(allocator: std.mem.Allocator, cfg: config.Cmd) !void { defer manifest.deinit(allocator); } + const snapshot_slot = if (snap_files.incremental_info) |inc| inc.slot else snap_files.full.slot; var rpc_hooks = sig.rpc.Hooks{}; defer rpc_hooks.deinit(allocator); - try accountsdb.registerRPCHooks(&rpc_hooks); + try accountsdb.registerRPCHooks(&rpc_hooks, null, snapshot_slot); var server_ctx = try sig.rpc.server.Context.init(.{ .allocator = allocator, diff --git a/src/consensus/replay_tower.zig b/src/consensus/replay_tower.zig index af98443b64..f446411d24 100644 --- a/src/consensus/replay_tower.zig +++ b/src/consensus/replay_tower.zig @@ -5448,10 +5448,11 @@ pub const TestFixture = struct { errdefer state.deinit(allocator); state.hash = .init(root.hash); - break :blk try .init(allocator, root.slot, .{ - .constants = constants, - .state = state, - }); + break :blk try .init( + allocator, + root.slot, + .{ .constants = constants, .state = state }, + ); }; errdefer slot_tracker.deinit(allocator); diff --git a/src/consensus/vote_listener.zig b/src/consensus/vote_listener.zig index 5a491faa51..cd30105b89 100644 --- a/src/consensus/vote_listener.zig +++ b/src/consensus/vote_listener.zig @@ -24,11 +24,11 @@ const EpochTracker = sig.core.EpochTracker; const Logger = sig.trace.Logger("vote_listener"); pub const SlotDataProvider = struct { - slot_tracker: *const SlotTracker, + slot_tracker: *SlotTracker, epoch_tracker: *const EpochTracker, pub fn rootSlot(self: *const SlotDataProvider) Slot { - return self.slot_tracker.root; + return self.slot_tracker.root.load(.monotonic); } fn getSlotHash(self: *const SlotDataProvider, slot: Slot) ?Hash { @@ -867,6 +867,7 @@ fn trackNewVotesAndNotifyConfirmations( .slot = slot, .hash = hash, }); + slot_data_provider.slot_tracker.latest_confirmed_slot.update(slot); // Notify subscribers about new optimistic confirmation if (senders.bank_notification) |sender| { sender.send(.{ .optimistically_confirmed = slot }) catch |err| { @@ -1090,6 +1091,9 @@ test "trackNewVotesAndNotifyConfirmations filter" { } diff.sortAsc(); try std.testing.expectEqualSlices(Slot, diff.map.keys(), &.{ 7, 8 }); + + // No stake delegated, so optimistic confirmation should not be reached. + try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.confirmed)); } const ThresholdReachedResults = std.bit_set.IntegerBitSet(THRESHOLDS_TO_CHECK.len); @@ -1790,6 +1794,11 @@ test "simple usage" { .ledger = &ledger, .gossip_votes = null, }); + + // Since no votes were sent, slot trackers should remain at their initialized state. + // NOTE: processed slot is not used here, but required to construct SlotTracker. + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.processed)); + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } test "check trackers" { @@ -1997,6 +2006,11 @@ test "check trackers" { expected_trackers.items, actual_trackers.items, ); + + // Votes were processed but no stake was delegated to validators, so + // optimisitic confirmation was not reached. + try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.processed)); + try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.confirmed)); } // tests for OptimisticConfirmationVerifier moved to optimistic_vote_verifier.zig diff --git a/src/replay/consensus/cluster_sync.zig b/src/replay/consensus/cluster_sync.zig index 27290ef3ef..ab70121b30 100644 --- a/src/replay/consensus/cluster_sync.zig +++ b/src/replay/consensus/cluster_sync.zig @@ -492,7 +492,7 @@ fn processAncestorHashesDuplicateSlots( slot_tracker: *const SlotTracker, duplicate_slots_to_repair: *SlotData.DuplicateSlotsToRepair, ) !void { - const root = slot_tracker.root; + const root = slot_tracker.root.load(.monotonic); while (ancestor_duplicate_slots_receiver.tryReceive()) |ancestor_dupe_slot_to_repair| { const request_type = ancestor_dupe_slot_to_repair.request_type; @@ -560,7 +560,7 @@ fn processDuplicateConfirmedSlots( ancestor_hashes_replay_update_sender: *sig.sync.Channel(AncestorHashesReplayUpdate), purge_repair_slot_counter: *SlotData.PurgeRepairSlotCounters, ) !void { - const root = slot_tracker.root; + const root = slot_tracker.root.load(.monotonic); for (duplicate_confirmed_slots_received) |new_duplicate_confirmed_slot| { const confirmed_slot, const duplicate_confirmed_hash = new_duplicate_confirmed_slot.tuple(); if (confirmed_slot <= root) { @@ -642,7 +642,7 @@ fn processPrunedButPopularForks( slot_tracker: *const SlotTracker, ancestor_hashes_replay_update_sender: *sig.sync.Channel(AncestorHashesReplayUpdate), ) !void { - const root = slot_tracker.root; + const root = slot_tracker.root.load(.monotonic); while (pruned_but_popular_forks_receiver.tryReceive()) |new_popular_pruned_slot| { if (new_popular_pruned_slot <= root) { continue; @@ -700,7 +700,7 @@ fn processDuplicateSlots( }); } - break :blk .{ slot_tracker.root, slots_hashes }; + break :blk .{ slot_tracker.root.load(.monotonic), slots_hashes }; }; for (new_duplicate_slots.constSlice(), slots_hashes.constSlice()) |duplicate_slot, slot_hash| { // WindowService should only send the signal once per slot @@ -1564,7 +1564,7 @@ test "apply state changes" { // MarkSlotDuplicate should mark progress map and remove // the slot from fork choice - const duplicate_slot = slot_tracker.root + 1; + const duplicate_slot = slot_tracker.root.load(.monotonic) + 1; const duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?; // AKA: `ResultingStateChange::MarkSlotDuplicate` in agave try heaviest_subtree_fork_choice.markForkInvalidCandidate(allocator, &.{ @@ -1624,7 +1624,7 @@ test "apply state changes slot frozen" { var ledger = try ledger_tests.initTestLedger(allocator, @src(), .FOR_TESTS); defer ledger.deinit(); - const duplicate_slot = slot_tracker.root + 1; + const duplicate_slot = slot_tracker.root.load(.monotonic) + 1; const duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?; // Simulate ReplayStage freezing a Slot with the given hash. @@ -1660,9 +1660,10 @@ test "apply state changes slot frozen" { // version in blockstore. const new_slot_hash: Hash = .initRandom(random); const root_slot_hash: sig.core.hash.SlotAndHash = rsh: { - const root_slot_info = slot_tracker.get(slot_tracker.root).?; + const root_slot = slot_tracker.root.load(.monotonic); + const root_slot_info = slot_tracker.get(root_slot).?; break :rsh .{ - .slot = slot_tracker.root, + .slot = root_slot, .hash = root_slot_info.state.hash.readCopy().?, }; }; @@ -1712,7 +1713,7 @@ test "apply state changes duplicate confirmed matches frozen" { var ledger = try ledger_tests.initTestLedger(allocator, @src(), .FOR_TESTS); defer ledger.deinit(); - const duplicate_slot = slot_tracker.root + 1; + const duplicate_slot = slot_tracker.root.load(.monotonic) + 1; const our_duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?; var duplicate_slots_to_repair: SlotData.DuplicateSlotsToRepair = .empty; @@ -1808,7 +1809,7 @@ test "apply state changes slot frozen and duplicate confirmed matches frozen" { var purge_repair_slot_counter: SlotData.PurgeRepairSlotCounters = .empty; defer purge_repair_slot_counter.deinit(allocator); - const duplicate_slot = slot_tracker.root + 1; + const duplicate_slot = slot_tracker.root.load(.monotonic) + 1; const our_duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?; // Setup and check the state that is about to change. diff --git a/src/replay/consensus/core.zig b/src/replay/consensus/core.zig index 35aee30a00..db4ab0af2c 100644 --- a/src/replay/consensus/core.zig +++ b/src/replay/consensus/core.zig @@ -136,7 +136,7 @@ pub const TowerConsensus = struct { signing: sig.identity.SigningKeys, account_reader: AccountReader, ledger: *sig.ledger.Ledger, - slot_tracker: *const SlotTracker, + slot_tracker: *SlotTracker, /// Usually `.now()`. now: sig.time.Instant, registry: *sig.prometheus.Registry(.{}), @@ -153,7 +153,8 @@ pub const TowerConsensus = struct { ); errdefer fork_choice.deinit(allocator); - const root_ref = deps.slot_tracker.get(deps.slot_tracker.root).?; + const root = deps.slot_tracker.root.load(.monotonic); + const root_ref = deps.slot_tracker.get(root).?; const root_ancestors = &root_ref.constants.ancestors; var tower: Tower = if (deps.identity.vote_account) |vote_account_address| @@ -164,8 +165,8 @@ pub const TowerConsensus = struct { vote_account_address, ) else - .{ .root = deps.slot_tracker.root }; - tower.setRoot(deps.slot_tracker.root); + .{ .root = root }; + tower.setRoot(root); const replay_tower: ReplayTower = try .init( .from(deps.logger), @@ -176,7 +177,7 @@ pub const TowerConsensus = struct { errdefer replay_tower.deinit(allocator); var vote_collector: sig.consensus.VoteCollector = - try .init(deps.now, deps.slot_tracker.root, deps.registry); + try .init(deps.now, root, deps.registry); errdefer vote_collector.deinit(allocator); return .{ @@ -205,7 +206,7 @@ pub const TowerConsensus = struct { ) !HeaviestSubtreeForkChoice { const root_slot, const root_hash = blk: { const root = slot_tracker.getRoot(); - const root_slot = slot_tracker.root; + const root_slot = slot_tracker.root.load(.monotonic); const root_hash = root.state.hash.readCopy(); break :blk .{ root_slot, root_hash.? }; }; @@ -571,7 +572,7 @@ pub const TowerConsensus = struct { } // Update cluster with the duplicate confirmation status. // Analogous to [ReplayStage::mark_slots_duplicate_confirmed](https://github.com/anza-xyz/agave/blob/47c0383f2301e5a739543c1af9992ae182b7e06c/core/src/replay_stage.rs#L3876) - const root_slot = slot_tracker.root; + const root_slot = slot_tracker.root.load(.monotonic); for (duplicate_confirmed_forks.items) |duplicate_confirmed_fork| { const slot, const frozen_hash = duplicate_confirmed_fork.tuple(); try self.handleDuplicateConfirmedFork( @@ -657,6 +658,12 @@ pub const TowerConsensus = struct { slot_leaders, vote_sockets, ); + + // Update the latest processed slot to the bank being voted on. + // This matches Agave's behavior: the processed slot is updated inside + // handle_votable_bank(), which is only called when vote_bank.is_some(). + // See: https://github.com/anza-xyz/agave/blob/5e900421520a10933642d5e9a21e191a70f9b125/core/src/replay_stage.rs#L2683 + slot_tracker.latest_processed_slot.set(voted.slot); } // Reset onto a fork @@ -1598,7 +1605,7 @@ fn checkAndHandleNewRoot( // Audit: The rest of the code maps to Self::handle_new_root in Agave. // Update the slot tracker. // Set new root. - slot_tracker.root = new_root; + slot_tracker.root.store(new_root, .monotonic); // Prune non rooted slots slot_tracker.pruneNonRooted(allocator); @@ -2497,17 +2504,27 @@ test "checkAndHandleNewRoot - missing slot" { var prng = std.Random.DefaultPrng.init(std.testing.random_seed); const random = prng.random(); - const root = SlotAndHash{ + const root_slot_and_hash = SlotAndHash{ .slot = 0, .hash = Hash.initRandom(random), }; - var fixture = try TestFixture.init(allocator, root); + const latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + const latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + const root: std.atomic.Value(Slot) = .init(0); + + // NOTE: TestFixture has its own SlotTracker as well. Unclear if that matters. + var fixture = try TestFixture.init( + allocator, + root_slot_and_hash, + ); defer fixture.deinit(allocator); // Build a tracked slot set wrapped in RwMux var slot_tracker: SlotTracker = .{ - .root = root.slot, + .root = root, + .latest_processed_slot = latest_processed_slot, + .latest_confirmed_slot = latest_confirmed_slot, .slots = .empty, }; defer slot_tracker.deinit(testing.allocator); @@ -2515,7 +2532,7 @@ test "checkAndHandleNewRoot - missing slot" { { const constants: SlotConstants = try .genesis(allocator, .initRandom(random)); errdefer constants.deinit(allocator); - try slot_tracker.put(testing.allocator, root.slot, .{ + try slot_tracker.put(testing.allocator, root_slot_and_hash.slot, .{ .constants = constants, .state = .GENESIS, }); @@ -2549,6 +2566,10 @@ test "checkAndHandleNewRoot - missing slot" { ); try testing.expectError(error.MissingSlot, result); + + // Verify slot trackers remain at initial state after failure + try testing.expectEqual(0, slot_tracker.getSlotForCommitment(.processed)); + try testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } test "checkAndHandleNewRoot - missing hash" { @@ -2564,12 +2585,17 @@ test "checkAndHandleNewRoot - missing hash" { var fixture = try TestFixture.init(allocator, root); defer fixture.deinit(allocator); - var slot_tracker2 = RwMux(SlotTracker).init(.{ .root = root.slot, .slots = .empty }); - defer { - const ptr, var lg = slot_tracker2.writeWithLock(); - defer lg.unlock(); - ptr.deinit(allocator); - } + const processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + const confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker2: SlotTracker = .{ + .root = .init(root.slot), + .slots = .empty, + .latest_processed_slot = processed_slot, + .latest_confirmed_slot = confirmed_slot, + }; + defer slot_tracker2.deinit(allocator); + { const constants = try SlotConstants.genesis(allocator, .initRandom(random)); errdefer constants.deinit(allocator); @@ -2577,9 +2603,7 @@ test "checkAndHandleNewRoot - missing hash" { var state: SlotState = .GENESIS; errdefer state.deinit(allocator); - const ptr, var lg = slot_tracker2.writeWithLock(); - defer lg.unlock(); - try ptr.put(allocator, root.slot, .{ + try slot_tracker2.put(allocator, root.slot, .{ .constants = constants, .state = state, }); @@ -2600,12 +2624,10 @@ test "checkAndHandleNewRoot - missing hash" { defer epoch_tracker.deinit(allocator); // Try to check a slot that doesn't exist in the tracker - const slot_tracker2_ptr, var slot_tracker2_lg = slot_tracker2.writeWithLock(); - defer slot_tracker2_lg.unlock(); const result = checkAndHandleNewRoot( allocator, test_state.resultWriter(), - slot_tracker2_ptr, + &slot_tracker2, &fixture.progress, &fixture.fork_choice, &epoch_tracker, @@ -2615,6 +2637,10 @@ test "checkAndHandleNewRoot - missing hash" { ); try testing.expectError(error.MissingHash, result); + + // Verify slot trackers remain at initial state after failure + try testing.expectEqual(0, slot_tracker2.getSlotForCommitment(.processed)); + try testing.expectEqual(0, slot_tracker2.getSlotForCommitment(.confirmed)); } test "checkAndHandleNewRoot - empty slot tracker" { @@ -2631,13 +2657,17 @@ test "checkAndHandleNewRoot - empty slot tracker" { var fixture = try TestFixture.init(testing.allocator, root); defer fixture.deinit(testing.allocator); - const slot_tracker_val3: SlotTracker = SlotTracker{ .root = root.slot, .slots = .{} }; - var slot_tracker3 = RwMux(SlotTracker).init(slot_tracker_val3); - defer { - const ptr, var lg = slot_tracker3.writeWithLock(); - defer lg.unlock(); - ptr.deinit(testing.allocator); - } + const processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{}; + const confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{}; + + var slot_tracker3: SlotTracker = .{ + .root = .init(root.slot), + .slots = .empty, + .latest_processed_slot = processed_slot, + .latest_confirmed_slot = confirmed_slot, + }; + defer slot_tracker3.deinit(testing.allocator); + var registry = sig.prometheus.Registry(.{}).init(testing.allocator); defer registry.deinit(); @@ -2653,12 +2683,10 @@ test "checkAndHandleNewRoot - empty slot tracker" { defer epoch_tracker.deinit(allocator); // Try to check a slot that doesn't exist in the tracker - const slot_tracker3_ptr, var slot_tracker3_lg = slot_tracker3.writeWithLock(); - defer slot_tracker3_lg.unlock(); const result = checkAndHandleNewRoot( testing.allocator, test_state.resultWriter(), - slot_tracker3_ptr, + &slot_tracker3, &fixture.progress, &fixture.fork_choice, &epoch_tracker, @@ -2668,6 +2696,10 @@ test "checkAndHandleNewRoot - empty slot tracker" { ); try testing.expectError(error.EmptySlotTracker, result); + + // Verify slot trackers remain at initial state after failure + try testing.expectEqual(0, slot_tracker3.getSlotForCommitment(.processed)); + try testing.expectEqual(0, slot_tracker3.getSlotForCommitment(.confirmed)); } test "checkAndHandleNewRoot - success" { @@ -2695,7 +2727,12 @@ test "checkAndHandleNewRoot - success" { var fixture = try TestFixture.init(allocator, root); defer fixture.deinit(allocator); - var slot_tracker4 = RwMux(SlotTracker).init(.{ .root = root.slot, .slots = .empty }); + var slot_tracker4 = RwMux(SlotTracker).init(.{ + .root = std.atomic.Value(Slot).init(root.slot), + .slots = .empty, + .latest_processed_slot = .{}, + .latest_confirmed_slot = .{}, + }); defer { const ptr, var lg = slot_tracker4.writeWithLock(); defer lg.unlock(); @@ -4723,12 +4760,10 @@ test "edge cases - duplicate slot" { const slot_tracker = &replay_state.slot_tracker; const progress_map = &replay_state.progress_map; - std.debug.assert(slot_tracker.root == 0); + const root_slot0 = slot_tracker.root.load(.monotonic); + std.debug.assert(root_slot0 == 0); - const root_slot0 = slot_tracker.root; const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; - - std.debug.assert(root_slot0 == 0); // assert initial root value std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash // -- slot1 -- // @@ -4894,12 +4929,10 @@ test "edge cases - duplicate confirmed slot" { const slot_tracker = &replay_state.slot_tracker; const progress_map = &replay_state.progress_map; - std.debug.assert(slot_tracker.root == 0); + const root_slot0 = slot_tracker.root.load(.monotonic); + std.debug.assert(root_slot0 == 0); - const root_slot0 = slot_tracker.root; const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; - - std.debug.assert(root_slot0 == 0); // assert initial root value std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash // -- slot1 -- // @@ -5066,16 +5099,14 @@ test "edge cases - gossip verified vote hashes" { const slot_tracker = &replay_state.slot_tracker; const progress_map = &replay_state.progress_map; - std.debug.assert(slot_tracker.root == 0); + const root_slot0 = slot_tracker.root.load(.monotonic); + std.debug.assert(root_slot0 == 0); // assert initial root value var vote_collector: sig.consensus.vote_listener.VoteCollector = - try .init(.EPOCH_ZERO, slot_tracker.root, ®istry); + try .init(.EPOCH_ZERO, root_slot0, ®istry); defer vote_collector.deinit(gpa); - const root_slot0 = slot_tracker.root; const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?; - - std.debug.assert(root_slot0 == 0); // assert initial root value std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash // -- slot1 -- // @@ -5223,6 +5254,9 @@ test "edge cases - gossip verified vote hashes" { &.{ pk1, pk2 }, tower_consensus.latest_validator_votes.max_gossip_frozen_votes.keys(), ); + + // Optimisic confirmation threshold not reached during this test. + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // TODO: Re-implement tests for the new consolidated API @@ -5421,6 +5455,12 @@ test "vote on heaviest frozen descendant with no switch" { // 5. Assert forkchoice try std.testing.expectEqual(slot_1, consensus.fork_choice.heaviestOverallSlot().slot); try std.testing.expectEqual(slot1_hash, consensus.fork_choice.heaviestOverallSlot().hash); + + // 6. Assert trackers + // processed slot is updated to slot_1 when the tower votes on it + try std.testing.expectEqual(slot_1, slot_tracker.getSlotForCommitment(.processed)); + // confirmed slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // State setup @@ -5641,6 +5681,12 @@ test "vote accounts with landed votes populate bank stats" { // } try std.testing.expectEqual(6, stats1.lockout_intervals.map.get(5).?.items.len); try std.testing.expectEqual(slot_1, consensus.fork_choice.heaviestOverallSlot().slot); + + // Assert trackers + // processed slot is updated to slot_1 when the tower votes on it + try std.testing.expectEqual(slot_1, slot_tracker.getSlotForCommitment(.processed)); + // confirmed slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // Test case: @@ -5745,10 +5791,11 @@ test "root advances after vote satisfies lockouts" { hashes[i] = Hash{ .data = .{@as(u8, @intCast(i % 256))} ** Hash.SIZE }; } - var slot_tracker: SlotTracker = try .init(allocator, initial_root, .{ - .constants = root_consts, - .state = root_state, - }); + var slot_tracker: SlotTracker = try .init( + allocator, + initial_root, + .{ .constants = root_consts, .state = root_state }, + ); defer slot_tracker.deinit(allocator); var prng = std.Random.DefaultPrng.init(12345); @@ -5986,7 +6033,7 @@ test "root advances after vote satisfies lockouts" { consensus.replay_tower.tower.votes.len, ); - try std.testing.expectEqual(1, slot_tracker.root); + try std.testing.expectEqual(1, slot_tracker.root.load(.monotonic)); // No longer tracking slot 0 try std.testing.expect(!slot_tracker.contains(0)); // Still tracking slot 1 @@ -6073,7 +6120,7 @@ test "root advances after vote satisfies lockouts" { const last_voted = consensus.replay_tower.tower.lastVotedSlot(); try std.testing.expectEqual(33, last_voted); - try std.testing.expectEqual(2, slot_tracker.root); + try std.testing.expectEqual(2, slot_tracker.root.load(.monotonic)); // No longer tracking slot 0 try std.testing.expect(!slot_tracker.contains(0)); // No longer tracking slot 1 @@ -6090,6 +6137,12 @@ test "root advances after vote satisfies lockouts" { try std.testing.expect(progress.map.get(2) != null); try std.testing.expectEqual(33, consensus.fork_choice.heaviestOverallSlot().slot); } + + // Assert trackers + // processed slot is updated to slot 33 when the tower votes on it + try std.testing.expectEqual(33, slot_tracker.getSlotForCommitment(.processed)); + // confirmed slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // Test case: @@ -6151,10 +6204,11 @@ test "vote refresh when no new vote available" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var slot_tracker = try SlotTracker.init(allocator, root_slot, .{ - .constants = root_consts, - .state = root_state, - }); + var slot_tracker = try SlotTracker.init( + allocator, + root_slot, + .{ .constants = root_consts, .state = root_state }, + ); defer slot_tracker.deinit(allocator); const slot1_hash = Hash{ .data = .{1} ** Hash.SIZE }; @@ -6279,6 +6333,12 @@ test "vote refresh when no new vote available" { try std.testing.expectEqual(1, consensus.replay_tower.tower.votes.len); try std.testing.expectEqual(1, consensus.fork_choice.heaviestOverallSlot().slot); + + // Assert trackers + // processed slot is updated to slot 1 when the tower votes on it + try std.testing.expectEqual(1, slot_tracker.getSlotForCommitment(.processed)); + // confirmed slot remains at 0 (no optimistic confirmation votes processed) + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } // Test case: @@ -6344,10 +6404,11 @@ test "detect and mark duplicate confirmed fork" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var slot_tracker = try SlotTracker.init(allocator, root_slot, .{ - .constants = root_consts, - .state = root_state, - }); + var slot_tracker = try SlotTracker.init( + allocator, + root_slot, + .{ .constants = root_consts, .state = root_state }, + ); defer slot_tracker.deinit(allocator); // Add frozen slot 1 @@ -6609,10 +6670,11 @@ test "detect and mark duplicate slot" { try bhq.mut().insertGenesisHash(allocator, Hash.ZEROES, 0); } - var slot_tracker = try SlotTracker.init(allocator, root_slot, .{ - .constants = root_consts, - .state = root_state, - }); + var slot_tracker = try SlotTracker.init( + allocator, + root_slot, + .{ .constants = root_consts, .state = root_state }, + ); defer slot_tracker.deinit(allocator); const slot1_hash = Hash{ .data = .{1} ** Hash.SIZE }; diff --git a/src/replay/consensus/process_result.zig b/src/replay/consensus/process_result.zig index 9c932be7cd..475e509792 100644 --- a/src/replay/consensus/process_result.zig +++ b/src/replay/consensus/process_result.zig @@ -117,7 +117,7 @@ fn markDeadSlot( params.allocator, .from(params.logger), dead_slot, - params.slot_tracker.root, + params.slot_tracker.root.load(.monotonic), params.duplicate_slots_to_repair, ancestor_hashes_replay_update_sender, dead_state, @@ -148,7 +148,7 @@ fn markDeadSlot( params.allocator, .from(params.logger), dead_slot, - params.slot_tracker.root, + params.slot_tracker.root.load(.monotonic), params.duplicate_slots_tracker, params.fork_choice, duplicate_state, @@ -194,7 +194,7 @@ fn updateConsensusForFrozenSlot(params: ProcessResultParams, slot: Slot) !void { params.allocator, .from(params.logger), slot, - params.slot_tracker.root, + params.slot_tracker.root.load(.monotonic), params.ledger.resultWriter(), params.fork_choice, params.duplicate_slots_to_repair, @@ -307,7 +307,9 @@ const TestReplayStateResources = struct { self.slot_tracker = SlotTracker{ .slots = .empty, - .root = 0, + .latest_processed_slot = .{}, + .latest_confirmed_slot = .{}, + .root = .init(0), }; self.ancestor_hashes_replay_update_channel = try sig diff --git a/src/replay/service.zig b/src/replay/service.zig index 8c1ec98674..cc18752568 100644 --- a/src/replay/service.zig +++ b/src/replay/service.zig @@ -208,10 +208,11 @@ pub const ReplayState = struct { const zone = tracy.Zone.init(@src(), .{ .name = "ReplayState init" }); defer zone.deinit(); - var slot_tracker: SlotTracker = try .init(deps.allocator, deps.root.slot, .{ - .constants = deps.root.constants, - .state = deps.root.state, - }); + var slot_tracker: SlotTracker = try .init( + deps.allocator, + deps.root.slot, + .{ .constants = deps.root.constants, .state = deps.root.state }, + ); errdefer slot_tracker.deinit(deps.allocator); errdefer { // do not free the root slot data parameter, we don't own it unless the function returns successfully @@ -324,7 +325,7 @@ pub fn trackNewSlots( var zone = tracy.Zone.init(@src(), .{ .name = "trackNewSlots" }); defer zone.deinit(); - const root = slot_tracker.root; + const root = slot_tracker.root.load(.monotonic); var frozen_slots = try slot_tracker.frozenSlots(allocator); defer frozen_slots.deinit(allocator); @@ -584,11 +585,25 @@ fn freezeCompletedSlots(state: *ReplayState, results: []const ReplayResult) !boo /// bypass the tower bft consensus protocol, simply rooting slots with SlotTree.reRoot fn bypassConsensus(state: *ReplayState) !void { + // NOTE: Processed slot semantics differ from Agave when Sig is in bypass-consensus mode. + // In bypass mode, `latest_processed_slot` is set to the highest slot among all fork + // leaves (SlotTree.tip()). + // + // This differs from Agave's behavior: the processed slot is only updated + // when `vote_bank.is_some()` (i.e., when the validator has selected a bank + // to vote on after passing all consensus checks like lockout, threshold, and + // switch proof). If the validator is locked out or fails + // threshold checks, the processed slot is NOT updated and can go stale. + // See: https://github.com/anza-xyz/agave/blob/5e900421520a10933642d5e9a21e191a70f9b125/core/src/replay_stage.rs#L2683 + // + // TowerConsensus implements Agave's processed slot semantics when consensus is enabled. + state.slot_tracker.latest_processed_slot.set(state.slot_tree.tip()); + if (state.slot_tree.reRoot(state.allocator)) |new_root| { const slot_tracker = &state.slot_tracker; state.logger.info().logf("rooting slot with SlotTree.reRoot: {}", .{new_root}); - slot_tracker.root = new_root; + slot_tracker.root.store(new_root, .monotonic); slot_tracker.pruneNonRooted(state.allocator); try state.status_cache.addRoot(state.allocator, new_root); @@ -793,6 +808,9 @@ test trackNewSlots { &.{ .{ 0, 0 }, .{ 1, 0 }, .{ 2, 1 }, .{ 4, 1 }, .{ 6, 4 } }, &.{ 3, 5 }, ); + + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.processed)); + try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed)); } fn expectSlotTracker( @@ -916,7 +934,7 @@ test "advance calls consensus.process with empty replay results" { try advanceReplay(&replay_state, try registry.initStruct(Metrics), null); // No slots were replayed - try std.testing.expectEqual(0, replay_state.slot_tracker.root); + try std.testing.expectEqual(0, replay_state.slot_tracker.root.load(.monotonic)); } test "Execute testnet block single threaded" { diff --git a/src/replay/trackers.zig b/src/replay/trackers.zig index c7e2612444..b5282c7000 100644 --- a/src/replay/trackers.zig +++ b/src/replay/trackers.zig @@ -7,6 +7,34 @@ const Allocator = std.mem.Allocator; const Slot = sig.core.Slot; const SlotConstants = sig.core.SlotConstants; const SlotState = sig.core.SlotState; +const Commitment = sig.rpc.methods.common.Commitment; + +pub const ForkChoiceProcessedSlot = struct { + slot: std.atomic.Value(Slot) = .init(0), + + /// Set the current processed slot (heaviest fork tip). + /// Uses store() because this can decrease when the fork choice + /// switches to a different fork with a lower slot. + pub fn set(self: *ForkChoiceProcessedSlot, new_slot: Slot) void { + self.slot.store(new_slot, .monotonic); + } + + pub fn get(self: *const ForkChoiceProcessedSlot) Slot { + return self.slot.load(.monotonic); + } +}; + +pub const OptimisticallyConfirmedSlot = struct { + slot: std.atomic.Value(Slot) = .init(0), + + pub fn update(self: *OptimisticallyConfirmedSlot, new_slot: Slot) void { + _ = self.slot.fetchMax(new_slot, .monotonic); + } + + pub fn get(self: *const OptimisticallyConfirmedSlot) Slot { + return self.slot.load(.monotonic); + } +}; /// Central registry that tracks high-level info about slots and how they fork. /// @@ -20,7 +48,9 @@ const SlotState = sig.core.SlotState; /// will end as soon as the items are removed. pub const SlotTracker = struct { slots: std.AutoArrayHashMapUnmanaged(Slot, *Element), - root: Slot, + latest_processed_slot: ForkChoiceProcessedSlot, + latest_confirmed_slot: OptimisticallyConfirmedSlot, + root: std.atomic.Value(Slot), pub const Element = struct { constants: SlotConstants, @@ -46,8 +76,10 @@ pub const SlotTracker = struct { slot_init: Element, ) std.mem.Allocator.Error!SlotTracker { var self: SlotTracker = .{ - .root = root_slot, + .root = .init(root_slot), .slots = .empty, + .latest_processed_slot = .{}, + .latest_confirmed_slot = .{}, }; errdefer self.deinit(allocator); @@ -86,6 +118,14 @@ pub const SlotTracker = struct { return elem.toRef(); } + pub fn getSlotForCommitment(self: *const SlotTracker, commitment: Commitment) Slot { + return switch (commitment) { + .processed => self.latest_processed_slot.get(), + .confirmed => self.latest_confirmed_slot.get(), + .finalized => self.root.load(.monotonic), + }; + } + pub const GetOrPutResult = struct { found_existing: bool, reference: Reference, @@ -114,7 +154,7 @@ pub const SlotTracker = struct { } pub fn getRoot(self: *const SlotTracker) Reference { - return self.get(self.root).?; // root slot's bank must exist + return self.get(self.root.load(.monotonic)).?; // root slot's bank must exist } pub fn contains(self: *const SlotTracker, slot: Slot) bool { @@ -188,8 +228,9 @@ pub const SlotTracker = struct { var slice = self.slots.entries.slice(); var index: usize = 0; + const root = self.root.load(.monotonic); while (index < slice.len) { - if (slice.items(.key)[index] < self.root) { + if (slice.items(.key)[index] < root) { const element = slice.items(.value)[index]; element.state.deinit(allocator); element.constants.deinit(allocator); @@ -212,6 +253,16 @@ pub const SlotTree = struct { const List = std.ArrayListUnmanaged; const min_age = 32; + /// Returns the highest slot among all fork tips (leaves). + /// In bypass mode (without ForkChoice), this represents the "processed" slot. + pub fn tip(self: *const SlotTree) Slot { + var max_slot: Slot = self.root.slot; + for (self.leaves.items) |leaf| { + max_slot = @max(max_slot, leaf.slot); + } + return max_slot; + } + pub fn deinit(const_self: SlotTree, allocator: Allocator) void { var self = const_self; self.root.destroyRecursivelyDownstream(allocator); @@ -445,10 +496,14 @@ fn testDummySlotConstants(slot: Slot) SlotConstants { test "SlotTracker.prune removes all slots less than root" { const allocator = std.testing.allocator; const root_slot: Slot = 4; - var tracker: SlotTracker = try .init(allocator, root_slot, .{ - .constants = testDummySlotConstants(root_slot), - .state = .GENESIS, - }); + var tracker: SlotTracker = try .init( + allocator, + root_slot, + .{ + .constants = testDummySlotConstants(root_slot), + .state = .GENESIS, + }, + ); defer tracker.deinit(allocator); // Add slots 1, 2, 3, 4, 5 @@ -469,6 +524,10 @@ test "SlotTracker.prune removes all slots less than root" { try std.testing.expect(!tracker.contains(1)); try std.testing.expect(!tracker.contains(2)); try std.testing.expect(!tracker.contains(3)); + + try std.testing.expectEqual(0, tracker.getSlotForCommitment(Commitment.processed)); + try std.testing.expectEqual(0, tracker.getSlotForCommitment(Commitment.confirmed)); + try std.testing.expectEqual(4, tracker.getSlotForCommitment(Commitment.finalized)); } test "SlotTree: if no forks, root follows 32 behind latest" { diff --git a/src/rpc/methods.zig b/src/rpc/methods.zig index 4ce6bdfe43..06ce4470b3 100644 --- a/src/rpc/methods.zig +++ b/src/rpc/methods.zig @@ -686,3 +686,15 @@ pub const common = struct { shredVersion: ?u16 = null, }; }; + +pub const SlotHookContext = struct { + slot_tracker: *const sig.replay.trackers.SlotTracker, + + pub fn getSlot(self: SlotHookContext, _: std.mem.Allocator, params: GetSlot) !GetSlot.Response { + const config = params.config orelse common.CommitmentSlotConfig{}; + const commitment = config.commitment orelse .finalized; + const slot = self.slot_tracker.getSlotForCommitment(commitment); + const min_slot = config.minContextSlot orelse return slot; + return if (slot >= min_slot) slot else error.RpcMinContextSlotNotMet; + } +}; diff --git a/src/rpc/server/server.zig b/src/rpc/server/server.zig index ce0c32ed41..1b3149c50f 100644 --- a/src/rpc/server/server.zig +++ b/src/rpc/server/server.zig @@ -285,7 +285,7 @@ test "serveSpawn getSnapshot" { var rpc_hooks = sig.rpc.Hooks{}; defer rpc_hooks.deinit(accountsdb.allocator); - try accountsdb.registerRPCHooks(&rpc_hooks); + try accountsdb.registerRPCHooks(&rpc_hooks, null, 0); const sock_addr = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 0); var server_ctx = try Context.init(.{ @@ -440,7 +440,7 @@ test "serveSpawn getAccountInfo" { var rpc_hooks = sig.rpc.Hooks{}; defer rpc_hooks.deinit(allocator); - try accountsdb.registerRPCHooks(&rpc_hooks); + try accountsdb.registerRPCHooks(&rpc_hooks, null, expected_slot); const test_sock_addr = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 0); var server_ctx = try Context.init(.{