Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const tracy = @import("tracy");

const sysvar = sig.runtime.sysvar;
const snapgen = sig.accounts_db.snapshot.data.generate;
const methods = sig.rpc.methods;

const Resolution = @import("../benchmarks.zig").Resolution;

Expand Down Expand Up @@ -2873,9 +2874,64 @@ pub const AccountsDB = struct {
return slotSatisfiesMax(slot, max_slot) and slotSatisfiesMin(slot, min_slot);
}

pub fn registerRPCHooks(self: *AccountsDB, rpc_hooks: *sig.rpc.Hooks) !void {
pub fn registerRPCHooks(
self: *AccountsDB,
rpc_hooks: *sig.rpc.Hooks,
slot_tracker: ?*const sig.replay.trackers.SlotTracker,
snapshot_slot: Slot,
) !void {
try rpc_hooks.set(self.allocator, struct {
accountsdb: *AccountsDB,
slot_tracker: ?*const sig.replay.trackers.SlotTracker,
snapshot_slot: Slot,

pub fn getBalance(
this: @This(),
_: std.mem.Allocator,
params: methods.GetBalance,
) !methods.GetBalance.Response {
const config = params.config orelse methods.common.CommitmentSlotConfig{};

const commitment = config.commitment orelse .finalized;
const commitment_slot: Slot = if (this.slot_tracker) |tracker|
tracker.getSlotForCommitment(commitment)
else
this.snapshot_slot;
const max_slot: ?Slot = if (commitment == .processed) null else commitment_slot;

// Check minContextSlot constraint.
if (config.minContextSlot) |min_slot| {
if (commitment_slot < min_slot) {
return error.RpcMinContextSlotNotMet;
}
}

// Look up account
const result = this.accountsdb.getSlotAndAccountInSlotRangeWithReadLock(
&params.pubkey,
config.minContextSlot,
max_slot,
) catch return error.AccountsDbError;

const lamports: u64 = if (result) |r| blk: {
const account, _, var account_lock = r;
defer account_lock.unlock();

break :blk switch (account) {
.file => |aif| aif.account_info.lamports,
.unrooted_map => |um| um.lamports,
};
// TODO: is defaulting to 0 the correct behavior here?
} else 0;

return .{
.context = .{
.slot = commitment_slot,
.apiVersion = "2.0.15",
},
.value = lamports,
};
}

pub fn getAccountInfo(
this: @This(),
Expand Down Expand Up @@ -3098,7 +3154,7 @@ pub const AccountsDB = struct {

return null;
}
}{ .accountsdb = self });
}{ .accountsdb = self, .slot_tracker = slot_tracker, .snapshot_slot = snapshot_slot });
}
};

Expand Down
7 changes: 6 additions & 1 deletion src/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,10 @@ fn validator(
});
defer replay_service_state.deinit(allocator);

try app_base.rpc_hooks.set(allocator, sig.rpc.methods.SlotHookContext{
.slot_tracker = &replay_service_state.replay_state.slot_tracker,
});

const replay_thread = try replay_service_state.spawnService(
&app_base,
if (maybe_vote_sockets) |*vs| vs else null,
Expand Down Expand Up @@ -2050,9 +2054,10 @@ fn mockRpcServer(allocator: std.mem.Allocator, cfg: config.Cmd) !void {
defer manifest.deinit(allocator);
}

const snapshot_slot = if (snap_files.incremental_info) |inc| inc.slot else snap_files.full.slot;
var rpc_hooks = sig.rpc.Hooks{};
defer rpc_hooks.deinit(allocator);
try accountsdb.registerRPCHooks(&rpc_hooks);
try accountsdb.registerRPCHooks(&rpc_hooks, null, snapshot_slot);

var server_ctx = try sig.rpc.server.Context.init(.{
.allocator = allocator,
Expand Down
9 changes: 5 additions & 4 deletions src/consensus/replay_tower.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5448,10 +5448,11 @@ pub const TestFixture = struct {
errdefer state.deinit(allocator);
state.hash = .init(root.hash);

break :blk try .init(allocator, root.slot, .{
.constants = constants,
.state = state,
});
break :blk try .init(
allocator,
root.slot,
.{ .constants = constants, .state = state },
);
};
errdefer slot_tracker.deinit(allocator);

Expand Down
18 changes: 16 additions & 2 deletions src/consensus/vote_listener.zig
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ const EpochTracker = sig.core.EpochTracker;
const Logger = sig.trace.Logger("vote_listener");

pub const SlotDataProvider = struct {
slot_tracker: *const SlotTracker,
slot_tracker: *SlotTracker,
epoch_tracker: *const EpochTracker,

pub fn rootSlot(self: *const SlotDataProvider) Slot {
return self.slot_tracker.root;
return self.slot_tracker.root.load(.monotonic);
}

fn getSlotHash(self: *const SlotDataProvider, slot: Slot) ?Hash {
Expand Down Expand Up @@ -867,6 +867,7 @@ fn trackNewVotesAndNotifyConfirmations(
.slot = slot,
.hash = hash,
});
slot_data_provider.slot_tracker.latest_confirmed_slot.update(slot);
// Notify subscribers about new optimistic confirmation
if (senders.bank_notification) |sender| {
sender.send(.{ .optimistically_confirmed = slot }) catch |err| {
Expand Down Expand Up @@ -1090,6 +1091,9 @@ test "trackNewVotesAndNotifyConfirmations filter" {
}
diff.sortAsc();
try std.testing.expectEqualSlices(Slot, diff.map.keys(), &.{ 7, 8 });

// No stake delegated, so optimistic confirmation should not be reached.
try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.confirmed));
}

const ThresholdReachedResults = std.bit_set.IntegerBitSet(THRESHOLDS_TO_CHECK.len);
Expand Down Expand Up @@ -1790,6 +1794,11 @@ test "simple usage" {
.ledger = &ledger,
.gossip_votes = null,
});

// Since no votes were sent, slot trackers should remain at their initialized state.
// NOTE: processed slot is not used here, but required to construct SlotTracker.
try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.processed));
try std.testing.expectEqual(0, slot_tracker.getSlotForCommitment(.confirmed));
}

test "check trackers" {
Expand Down Expand Up @@ -1997,6 +2006,11 @@ test "check trackers" {
expected_trackers.items,
actual_trackers.items,
);

// Votes were processed but no stake was delegated to validators, so
// optimisitic confirmation was not reached.
try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.processed));
try std.testing.expectEqual(0, slot_data_provider.slot_tracker.getSlotForCommitment(.confirmed));
}

// tests for OptimisticConfirmationVerifier moved to optimistic_vote_verifier.zig
Expand Down
21 changes: 11 additions & 10 deletions src/replay/consensus/cluster_sync.zig
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ fn processAncestorHashesDuplicateSlots(
slot_tracker: *const SlotTracker,
duplicate_slots_to_repair: *SlotData.DuplicateSlotsToRepair,
) !void {
const root = slot_tracker.root;
const root = slot_tracker.root.load(.monotonic);

while (ancestor_duplicate_slots_receiver.tryReceive()) |ancestor_dupe_slot_to_repair| {
const request_type = ancestor_dupe_slot_to_repair.request_type;
Expand Down Expand Up @@ -560,7 +560,7 @@ fn processDuplicateConfirmedSlots(
ancestor_hashes_replay_update_sender: *sig.sync.Channel(AncestorHashesReplayUpdate),
purge_repair_slot_counter: *SlotData.PurgeRepairSlotCounters,
) !void {
const root = slot_tracker.root;
const root = slot_tracker.root.load(.monotonic);
for (duplicate_confirmed_slots_received) |new_duplicate_confirmed_slot| {
const confirmed_slot, const duplicate_confirmed_hash = new_duplicate_confirmed_slot.tuple();
if (confirmed_slot <= root) {
Expand Down Expand Up @@ -642,7 +642,7 @@ fn processPrunedButPopularForks(
slot_tracker: *const SlotTracker,
ancestor_hashes_replay_update_sender: *sig.sync.Channel(AncestorHashesReplayUpdate),
) !void {
const root = slot_tracker.root;
const root = slot_tracker.root.load(.monotonic);
while (pruned_but_popular_forks_receiver.tryReceive()) |new_popular_pruned_slot| {
if (new_popular_pruned_slot <= root) {
continue;
Expand Down Expand Up @@ -700,7 +700,7 @@ fn processDuplicateSlots(
});
}

break :blk .{ slot_tracker.root, slots_hashes };
break :blk .{ slot_tracker.root.load(.monotonic), slots_hashes };
};
for (new_duplicate_slots.constSlice(), slots_hashes.constSlice()) |duplicate_slot, slot_hash| {
// WindowService should only send the signal once per slot
Expand Down Expand Up @@ -1564,7 +1564,7 @@ test "apply state changes" {

// MarkSlotDuplicate should mark progress map and remove
// the slot from fork choice
const duplicate_slot = slot_tracker.root + 1;
const duplicate_slot = slot_tracker.root.load(.monotonic) + 1;
const duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?;
// AKA: `ResultingStateChange::MarkSlotDuplicate` in agave
try heaviest_subtree_fork_choice.markForkInvalidCandidate(allocator, &.{
Expand Down Expand Up @@ -1624,7 +1624,7 @@ test "apply state changes slot frozen" {
var ledger = try ledger_tests.initTestLedger(allocator, @src(), .FOR_TESTS);
defer ledger.deinit();

const duplicate_slot = slot_tracker.root + 1;
const duplicate_slot = slot_tracker.root.load(.monotonic) + 1;
const duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?;

// Simulate ReplayStage freezing a Slot with the given hash.
Expand Down Expand Up @@ -1660,9 +1660,10 @@ test "apply state changes slot frozen" {
// version in blockstore.
const new_slot_hash: Hash = .initRandom(random);
const root_slot_hash: sig.core.hash.SlotAndHash = rsh: {
const root_slot_info = slot_tracker.get(slot_tracker.root).?;
const root_slot = slot_tracker.root.load(.monotonic);
const root_slot_info = slot_tracker.get(root_slot).?;
break :rsh .{
.slot = slot_tracker.root,
.slot = root_slot,
.hash = root_slot_info.state.hash.readCopy().?,
};
};
Expand Down Expand Up @@ -1712,7 +1713,7 @@ test "apply state changes duplicate confirmed matches frozen" {
var ledger = try ledger_tests.initTestLedger(allocator, @src(), .FOR_TESTS);
defer ledger.deinit();

const duplicate_slot = slot_tracker.root + 1;
const duplicate_slot = slot_tracker.root.load(.monotonic) + 1;
const our_duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?;

var duplicate_slots_to_repair: SlotData.DuplicateSlotsToRepair = .empty;
Expand Down Expand Up @@ -1808,7 +1809,7 @@ test "apply state changes slot frozen and duplicate confirmed matches frozen" {
var purge_repair_slot_counter: SlotData.PurgeRepairSlotCounters = .empty;
defer purge_repair_slot_counter.deinit(allocator);

const duplicate_slot = slot_tracker.root + 1;
const duplicate_slot = slot_tracker.root.load(.monotonic) + 1;
const our_duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?;

// Setup and check the state that is about to change.
Expand Down
Loading