Skip to content

Commit a29f437

Browse files
committed
feat(rpc): WIP implement getSlot
1 parent 1d3c9bc commit a29f437

File tree

8 files changed

+164
-47
lines changed

8 files changed

+164
-47
lines changed

src/cmd.zig

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,9 @@ fn validator(
13141314
else
13151315
null;
13161316

1317+
var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{};
1318+
var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{};
1319+
13171320
var replay_service_state: ReplayAndConsensusServiceState = try .init(allocator, .{
13181321
.app_base = &app_base,
13191322
.account_store = .{ .accounts_db_two = &new_db },
@@ -1325,9 +1328,15 @@ fn validator(
13251328
.voting_enabled = voting_enabled,
13261329
.vote_account_address = maybe_vote_pubkey,
13271330
.stop_at_slot = cfg.stop_at_slot,
1331+
.latest_processed_slot = &latest_processed_slot,
1332+
.latest_confirmed_slot = &latest_confirmed_slot,
13281333
});
13291334
defer replay_service_state.deinit(allocator);
13301335

1336+
try app_base.rpc_hooks.set(allocator, sig.rpc.methods.SlotHookContext{
1337+
.slot_tracker = &replay_service_state.replay_state.slot_tracker,
1338+
});
1339+
13311340
const replay_thread = try replay_service_state.spawnService(
13321341
&app_base,
13331342
if (maybe_vote_sockets) |*vs| vs else null,
@@ -1529,6 +1538,9 @@ fn replayOffline(
15291538
});
15301539
}
15311540

1541+
var latest_processed_slot: sig.replay.trackers.ForkChoiceProcessedSlot = .{};
1542+
var latest_confirmed_slot: sig.replay.trackers.OptimisticallyConfirmedSlot = .{};
1543+
15321544
var replay_service_state: ReplayAndConsensusServiceState = try .init(allocator, .{
15331545
.app_base = &app_base,
15341546
.account_store = .{ .accounts_db_two = &new_db },
@@ -1540,6 +1552,8 @@ fn replayOffline(
15401552
.voting_enabled = false,
15411553
.vote_account_address = null,
15421554
.stop_at_slot = cfg.stop_at_slot,
1555+
.latest_processed_slot = &latest_processed_slot,
1556+
.latest_confirmed_slot = &latest_confirmed_slot,
15431557
});
15441558
defer replay_service_state.deinit(allocator);
15451559

@@ -2240,6 +2254,8 @@ const ReplayAndConsensusServiceState = struct {
22402254
voting_enabled: bool,
22412255
vote_account_address: ?Pubkey,
22422256
stop_at_slot: ?Slot,
2257+
latest_processed_slot: *sig.replay.trackers.ForkChoiceProcessedSlot,
2258+
latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot,
22432259
},
22442260
) !ReplayAndConsensusServiceState {
22452261
var replay_state: replay.service.ReplayState = replay_state: {
@@ -2310,6 +2326,8 @@ const ReplayAndConsensusServiceState = struct {
23102326
.hard_forks = hard_forks,
23112327
.replay_threads = params.replay_threads,
23122328
.stop_at_slot = params.stop_at_slot,
2329+
.latest_processed_slot = params.latest_processed_slot,
2330+
.latest_confirmed_slot = params.latest_confirmed_slot,
23132331
}, if (params.disable_consensus) .disabled else .enabled);
23142332
};
23152333
errdefer replay_state.deinit();

src/consensus/vote_listener.zig

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub const SlotDataProvider = struct {
2828
epoch_tracker: *const EpochTracker,
2929

3030
pub fn rootSlot(self: *const SlotDataProvider) Slot {
31-
return self.slot_tracker.root;
31+
return self.slot_tracker.root.load(.monotonic);
3232
}
3333

3434
fn getSlotHash(self: *const SlotDataProvider, slot: Slot) ?Hash {
@@ -317,6 +317,7 @@ pub const VoteCollector = struct {
317317
latest_vote_slot_per_validator: sig.utils.collections.PubkeyMap(Slot),
318318
last_process_root: sig.time.Instant,
319319
vote_processing_time: VoteProcessingTiming,
320+
latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot,
320321
metrics: VoteListenerMetrics,
321322

322323
pub fn deinit(self: *VoteCollector, allocator: std.mem.Allocator) void {
@@ -331,6 +332,7 @@ pub const VoteCollector = struct {
331332
now: sig.time.Instant,
332333
root_slot: Slot,
333334
registry: *sig.prometheus.Registry(.{}),
335+
latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot,
334336
) !VoteCollector {
335337
return .{
336338
.gossip_vote_receptor = .INIT,
@@ -339,6 +341,7 @@ pub const VoteCollector = struct {
339341
.latest_vote_slot_per_validator = .empty,
340342
.last_process_root = now,
341343
.vote_processing_time = .ZEROES,
344+
.latest_confirmed_slot = latest_confirmed_slot,
342345
.metrics = try .init(registry),
343346
};
344347
}
@@ -397,6 +400,7 @@ pub const VoteCollector = struct {
397400
gossip_vote_txs,
398401
&self.vote_processing_time,
399402
&self.latest_vote_slot_per_validator,
403+
self.latest_confirmed_slot,
400404
self.metrics,
401405
);
402406
defer allocator.free(confirmed_slots);
@@ -420,6 +424,7 @@ fn listenAndConfirmVotes(
420424
gossip_vote_txs: []const vote_parser.ParsedVote,
421425
vote_processing_time: ?*VoteProcessingTiming,
422426
latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot),
427+
latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot,
423428
metrics: VoteListenerMetrics,
424429
) std.mem.Allocator.Error![]const ThresholdConfirmedSlot {
425430
var replay_votes_buffer: std.ArrayListUnmanaged(vote_parser.ParsedVote) = .empty;
@@ -454,6 +459,7 @@ fn listenAndConfirmVotes(
454459
latest_vote_slot_per_validator,
455460
gossip_vote_txs,
456461
replay_votes,
462+
latest_confirmed_slot,
457463
metrics,
458464
);
459465
}
@@ -468,6 +474,7 @@ fn filterAndConfirmWithNewVotes(
468474
latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot),
469475
gossip_vote_txs: []const vote_parser.ParsedVote,
470476
replayed_votes: []const vote_parser.ParsedVote,
477+
latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot,
471478
metrics: VoteListenerMetrics,
472479
) std.mem.Allocator.Error![]const ThresholdConfirmedSlot {
473480
const root_slot = slot_data_provider.rootSlot();
@@ -503,6 +510,7 @@ fn filterAndConfirmWithNewVotes(
503510
&new_optimistic_confirmed_slots,
504511
is_gossip,
505512
latest_vote_slot_per_validator,
513+
latest_confirmed_slot,
506514
);
507515
if (is_gossip)
508516
metrics.gossip_votes_processed.inc()
@@ -767,6 +775,7 @@ fn trackNewVotesAndNotifyConfirmations(
767775
new_optimistic_confirmed_slots: *std.ArrayListUnmanaged(ThresholdConfirmedSlot),
768776
is_gossip_vote: bool,
769777
latest_vote_slot_per_validator: *sig.utils.collections.PubkeyMap(Slot),
778+
latest_confirmed_slot: *sig.replay.trackers.OptimisticallyConfirmedSlot,
770779
) std.mem.Allocator.Error!void {
771780
if (vote.isEmpty()) return;
772781
const root = slot_data_provider.rootSlot();
@@ -867,6 +876,7 @@ fn trackNewVotesAndNotifyConfirmations(
867876
.slot = slot,
868877
.hash = hash,
869878
});
879+
latest_confirmed_slot.update(slot);
870880
// Notify subscribers about new optimistic confirmation
871881
if (senders.bank_notification) |sender| {
872882
sender.send(.{ .optimistically_confirmed = slot }) catch |err| {

src/replay/consensus/cluster_sync.zig

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ fn processAncestorHashesDuplicateSlots(
492492
slot_tracker: *const SlotTracker,
493493
duplicate_slots_to_repair: *SlotData.DuplicateSlotsToRepair,
494494
) !void {
495-
const root = slot_tracker.root;
495+
const root = slot_tracker.root.load(.monotonic);
496496

497497
while (ancestor_duplicate_slots_receiver.tryReceive()) |ancestor_dupe_slot_to_repair| {
498498
const request_type = ancestor_dupe_slot_to_repair.request_type;
@@ -560,7 +560,7 @@ fn processDuplicateConfirmedSlots(
560560
ancestor_hashes_replay_update_sender: *sig.sync.Channel(AncestorHashesReplayUpdate),
561561
purge_repair_slot_counter: *SlotData.PurgeRepairSlotCounters,
562562
) !void {
563-
const root = slot_tracker.root;
563+
const root = slot_tracker.root.load(.monotonic);
564564
for (duplicate_confirmed_slots_received) |new_duplicate_confirmed_slot| {
565565
const confirmed_slot, const duplicate_confirmed_hash = new_duplicate_confirmed_slot.tuple();
566566
if (confirmed_slot <= root) {
@@ -642,7 +642,7 @@ fn processPrunedButPopularForks(
642642
slot_tracker: *const SlotTracker,
643643
ancestor_hashes_replay_update_sender: *sig.sync.Channel(AncestorHashesReplayUpdate),
644644
) !void {
645-
const root = slot_tracker.root;
645+
const root = slot_tracker.root.load(.monotonic);
646646
while (pruned_but_popular_forks_receiver.tryReceive()) |new_popular_pruned_slot| {
647647
if (new_popular_pruned_slot <= root) {
648648
continue;
@@ -700,7 +700,7 @@ fn processDuplicateSlots(
700700
});
701701
}
702702

703-
break :blk .{ slot_tracker.root, slots_hashes };
703+
break :blk .{ slot_tracker.root.load(.monotonic), slots_hashes };
704704
};
705705
for (new_duplicate_slots.constSlice(), slots_hashes.constSlice()) |duplicate_slot, slot_hash| {
706706
// WindowService should only send the signal once per slot
@@ -1563,7 +1563,7 @@ test "apply state changes" {
15631563

15641564
// MarkSlotDuplicate should mark progress map and remove
15651565
// the slot from fork choice
1566-
const duplicate_slot = slot_tracker.root + 1;
1566+
const duplicate_slot = slot_tracker.root.load(.monotonic) + 1;
15671567
const duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?;
15681568
// AKA: `ResultingStateChange::MarkSlotDuplicate` in agave
15691569
try heaviest_subtree_fork_choice.markForkInvalidCandidate(allocator, &.{
@@ -1623,7 +1623,7 @@ test "apply state changes slot frozen" {
16231623
var ledger = try ledger_tests.initTestLedger(allocator, @src(), .FOR_TESTS);
16241624
defer ledger.deinit();
16251625

1626-
const duplicate_slot = slot_tracker.root + 1;
1626+
const duplicate_slot = slot_tracker.root.load(.monotonic) + 1;
16271627
const duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?;
16281628

16291629
// Simulate ReplayStage freezing a Slot with the given hash.
@@ -1659,9 +1659,10 @@ test "apply state changes slot frozen" {
16591659
// version in blockstore.
16601660
const new_slot_hash: Hash = .initRandom(random);
16611661
const root_slot_hash: sig.core.hash.SlotAndHash = rsh: {
1662-
const root_slot_info = slot_tracker.get(slot_tracker.root).?;
1662+
const root_slot = slot_tracker.root.load(.monotonic);
1663+
const root_slot_info = slot_tracker.get(root_slot).?;
16631664
break :rsh .{
1664-
.slot = slot_tracker.root,
1665+
.slot = root_slot,
16651666
.hash = root_slot_info.state.hash.readCopy().?,
16661667
};
16671668
};
@@ -1711,7 +1712,7 @@ test "apply state changes duplicate confirmed matches frozen" {
17111712
var ledger = try ledger_tests.initTestLedger(allocator, @src(), .FOR_TESTS);
17121713
defer ledger.deinit();
17131714

1714-
const duplicate_slot = slot_tracker.root + 1;
1715+
const duplicate_slot = slot_tracker.root.load(.monotonic) + 1;
17151716
const our_duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?;
17161717

17171718
var duplicate_slots_to_repair: SlotData.DuplicateSlotsToRepair = .empty;
@@ -1807,7 +1808,7 @@ test "apply state changes slot frozen and duplicate confirmed matches frozen" {
18071808
var purge_repair_slot_counter: SlotData.PurgeRepairSlotCounters = .empty;
18081809
defer purge_repair_slot_counter.deinit(allocator);
18091810

1810-
const duplicate_slot = slot_tracker.root + 1;
1811+
const duplicate_slot = slot_tracker.root.load(.monotonic) + 1;
18111812
const our_duplicate_slot_hash = slot_tracker.get(duplicate_slot).?.state.hash.readCopy().?;
18121813

18131814
// Setup and check the state that is about to change.

src/replay/consensus/core.zig

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ pub const TowerConsensus = struct {
155155
);
156156
errdefer fork_choice.deinit(allocator);
157157

158-
const root_ref = deps.slot_tracker.get(deps.slot_tracker.root).?;
158+
const root = deps.slot_tracker.root.load(.monotonic);
159+
const root_ref = deps.slot_tracker.get(root).?;
159160
const root_ancestors = &root_ref.constants.ancestors;
160161

161162
var tower: Tower = if (deps.identity.vote_account) |vote_account_address|
@@ -166,8 +167,8 @@ pub const TowerConsensus = struct {
166167
vote_account_address,
167168
)
168169
else
169-
.{ .root = deps.slot_tracker.root };
170-
tower.setRoot(deps.slot_tracker.root);
170+
.{ .root = root };
171+
tower.setRoot(root);
171172

172173
const replay_tower: ReplayTower = try .init(
173174
.from(deps.logger),
@@ -178,7 +179,7 @@ pub const TowerConsensus = struct {
178179
errdefer replay_tower.deinit(allocator);
179180

180181
var vote_collector: sig.consensus.VoteCollector =
181-
try .init(deps.now, deps.slot_tracker.root, deps.registry);
182+
try .init(deps.now, root, deps.registry, deps.slot_tracker.latest_confirmed_slot);
182183
errdefer vote_collector.deinit(allocator);
183184

184185
return .{
@@ -207,7 +208,7 @@ pub const TowerConsensus = struct {
207208
) !HeaviestSubtreeForkChoice {
208209
const root_slot, const root_hash = blk: {
209210
const root = slot_tracker.getRoot();
210-
const root_slot = slot_tracker.root;
211+
const root_slot = slot_tracker.root.load(.monotonic);
211212
const root_hash = root.state.hash.readCopy();
212213
break :blk .{ root_slot, root_hash.? };
213214
};
@@ -586,7 +587,7 @@ pub const TowerConsensus = struct {
586587
}
587588
// Update cluster with the duplicate confirmation status.
588589
// Analogous to [ReplayStage::mark_slots_duplicate_confirmed](https://github.com/anza-xyz/agave/blob/47c0383f2301e5a739543c1af9992ae182b7e06c/core/src/replay_stage.rs#L3876)
589-
const root_slot = slot_tracker.root;
590+
const root_slot = slot_tracker.root.load(.monotonic);
590591
for (duplicate_confirmed_forks.items) |duplicate_confirmed_fork| {
591592
const slot, const frozen_hash = duplicate_confirmed_fork.tuple();
592593
try self.handleDuplicateConfirmedFork(
@@ -1606,7 +1607,7 @@ fn checkAndHandleNewRoot(
16061607
// Audit: The rest of the code maps to Self::handle_new_root in Agave.
16071608
// Update the slot tracker.
16081609
// Set new root.
1609-
slot_tracker.root = new_root;
1610+
slot_tracker.root.store(new_root, .monotonic);
16101611
// Prune non rooted slots
16111612
slot_tracker.pruneNonRooted(allocator);
16121613

@@ -4771,12 +4772,10 @@ test "edge cases - duplicate slot" {
47714772

47724773
const slot_tracker = &replay_state.slot_tracker;
47734774
const progress_map = &replay_state.progress_map;
4774-
std.debug.assert(slot_tracker.root == 0);
4775+
const root_slot0 = slot_tracker.root.load(.monotonic);
4776+
std.debug.assert(root_slot0 == 0);
47754777

4776-
const root_slot0 = slot_tracker.root;
47774778
const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?;
4778-
4779-
std.debug.assert(root_slot0 == 0); // assert initial root value
47804779
std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash
47814780

47824781
// -- slot1 -- //
@@ -4930,12 +4929,11 @@ test "edge cases - duplicate confirmed slot" {
49304929

49314930
const slot_tracker = &replay_state.slot_tracker;
49324931
const progress_map = &replay_state.progress_map;
4933-
std.debug.assert(slot_tracker.root == 0);
4932+
const root_slot0 = slot_tracker.root.load(.monotonic);
4933+
std.debug.assert(root_slot0 == 0);
49344934

4935-
const root_slot0 = slot_tracker.root;
49364935
const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?;
4937-
4938-
std.debug.assert(root_slot0 == 0); // assert initial root value
4936+
std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash
49394937
std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash
49404938

49414939
// -- slot1 -- //
@@ -5090,16 +5088,14 @@ test "edge cases - gossip verified vote hashes" {
50905088

50915089
const slot_tracker = &replay_state.slot_tracker;
50925090
const progress_map = &replay_state.progress_map;
5093-
std.debug.assert(slot_tracker.root == 0);
5091+
const root_slot0 = slot_tracker.root.load(.monotonic);
5092+
std.debug.assert(root_slot0 == 0); // assert initial root value
50945093

50955094
var vote_collector: sig.consensus.vote_listener.VoteCollector =
5096-
try .init(.EPOCH_ZERO, slot_tracker.root, &registry);
5095+
try .init(.EPOCH_ZERO, root_slot0, &registry);
50975096
defer vote_collector.deinit(gpa);
50985097

5099-
const root_slot0 = slot_tracker.root;
51005098
const root_slot0_hash = slot_tracker.getRoot().state.hash.readCopy().?;
5101-
5102-
std.debug.assert(root_slot0 == 0); // assert initial root value
51035099
std.debug.assert(root_slot0_hash.eql(.ZEROES)); // assert initial root hash
51045100

51055101
// -- slot1 -- //
@@ -6019,7 +6015,7 @@ test "root advances after vote satisfies lockouts" {
60196015
consensus.replay_tower.tower.votes.len,
60206016
);
60216017

6022-
try std.testing.expectEqual(1, slot_tracker.root);
6018+
try std.testing.expectEqual(1, slot_tracker.root.load(.monotonic));
60236019
// No longer tracking slot 0
60246020
try std.testing.expect(!slot_tracker.contains(0));
60256021
// Still tracking slot 1
@@ -6106,7 +6102,7 @@ test "root advances after vote satisfies lockouts" {
61066102
const last_voted = consensus.replay_tower.tower.lastVotedSlot();
61076103
try std.testing.expectEqual(33, last_voted);
61086104

6109-
try std.testing.expectEqual(2, slot_tracker.root);
6105+
try std.testing.expectEqual(2, slot_tracker.root.load(.monotonic));
61106106
// No longer tracking slot 0
61116107
try std.testing.expect(!slot_tracker.contains(0));
61126108
// No longer tracking slot 1

src/replay/consensus/process_result.zig

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ fn markDeadSlot(
117117
params.allocator,
118118
.from(params.logger),
119119
dead_slot,
120-
params.slot_tracker.root,
120+
params.slot_tracker.root.load(.monotonic),
121121
params.duplicate_slots_to_repair,
122122
ancestor_hashes_replay_update_sender,
123123
dead_state,
@@ -148,7 +148,7 @@ fn markDeadSlot(
148148
params.allocator,
149149
.from(params.logger),
150150
dead_slot,
151-
params.slot_tracker.root,
151+
params.slot_tracker.root.load(.monotonic),
152152
params.duplicate_slots_tracker,
153153
params.fork_choice,
154154
duplicate_state,
@@ -194,7 +194,7 @@ fn updateConsensusForFrozenSlot(params: ProcessResultParams, slot: Slot) !void {
194194
params.allocator,
195195
.from(params.logger),
196196
slot,
197-
params.slot_tracker.root,
197+
params.slot_tracker.root.load(.monotonic),
198198
params.ledger.resultWriter(),
199199
params.fork_choice,
200200
params.duplicate_slots_to_repair,

0 commit comments

Comments
 (0)