Skip to content

Commit a80e55a

Browse files
committed
reproduce slot gap issue
1 parent fef1cb7 commit a80e55a

File tree

1 file changed

+117
-0
lines changed

1 file changed

+117
-0
lines changed

jetstreamer-firehose/src/firehose.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2378,6 +2378,91 @@ use serial_test::serial;
23782378
#[cfg(test)]
23792379
use std::sync::{Mutex, OnceLock};
23802380

2381+
#[cfg(test)]
2382+
async fn assert_solscan_slot_non_vote_counts(
2383+
slot: u64,
2384+
transfer_count: u64,
2385+
defi_swaps_count: u64,
2386+
) {
2387+
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2388+
use std::sync::Arc;
2389+
2390+
let expected_non_vote = transfer_count + defi_swaps_count;
2391+
let found = Arc::new(AtomicBool::new(false));
2392+
let observed_total = Arc::new(AtomicU64::new(0));
2393+
let observed_non_vote = Arc::new(AtomicU64::new(0));
2394+
2395+
let found_block = found.clone();
2396+
let observed_total_block = observed_total.clone();
2397+
let target_slot_block = slot;
2398+
let target_slot_tx = slot;
2399+
let observed_non_vote_tx = observed_non_vote.clone();
2400+
2401+
firehose(
2402+
1,
2403+
target_slot_block..(target_slot_block + 1),
2404+
Some(move |_thread_id: usize, block: BlockData| {
2405+
let found_block = found_block.clone();
2406+
let observed_total_block = observed_total_block.clone();
2407+
async move {
2408+
if block.slot() == target_slot_block {
2409+
assert!(
2410+
!block.was_skipped(),
2411+
"slot {target_slot_block} was marked leader skipped",
2412+
);
2413+
if let BlockData::Block {
2414+
executed_transaction_count,
2415+
..
2416+
} = block
2417+
{
2418+
found_block.store(true, Ordering::Relaxed);
2419+
observed_total_block
2420+
.store(executed_transaction_count, Ordering::Relaxed);
2421+
}
2422+
}
2423+
Ok(())
2424+
}
2425+
.boxed()
2426+
}),
2427+
Some(move |_thread_id: usize, transaction: TransactionData| {
2428+
let observed_non_vote_tx = observed_non_vote_tx.clone();
2429+
async move {
2430+
if transaction.slot == target_slot_tx && !transaction.is_vote {
2431+
observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2432+
}
2433+
Ok(())
2434+
}
2435+
.boxed()
2436+
}),
2437+
None::<OnEntryFn>,
2438+
None::<OnRewardFn>,
2439+
None::<OnErrorFn>,
2440+
None::<OnStatsTrackingFn>,
2441+
None,
2442+
)
2443+
.await
2444+
.unwrap();
2445+
2446+
assert!(
2447+
found.load(Ordering::Relaxed),
2448+
"target slot {slot} was not processed"
2449+
);
2450+
let observed_total = observed_total.load(Ordering::Relaxed);
2451+
let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2452+
assert!(
2453+
observed_total > 0,
2454+
"slot {slot} executed transaction count was zero"
2455+
);
2456+
assert!(
2457+
observed_total >= expected_non_vote,
2458+
"slot {slot} executed transaction count {observed_total} is below expected non-vote {expected_non_vote}"
2459+
);
2460+
assert_eq!(
2461+
observed_non_vote, expected_non_vote,
2462+
"slot {slot} non-vote transaction count mismatch (transfer {transfer_count}, defi swaps {defi_swaps_count})"
2463+
);
2464+
}
2465+
23812466
#[tokio::test(flavor = "multi_thread")]
23822467
async fn test_firehose_epoch_800() {
23832468
use dashmap::DashSet;
@@ -2569,6 +2654,38 @@ async fn test_firehose_target_slot_transactions() {
25692654
);
25702655
}
25712656

2657+
#[cfg(test)]
2658+
#[serial]
2659+
#[tokio::test(flavor = "multi_thread")]
2660+
async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
2661+
solana_logger::setup_with_default("info");
2662+
assert_solscan_slot_non_vote_counts(311_173_980, 1_197, 211).await;
2663+
}
2664+
2665+
#[cfg(test)]
2666+
#[serial]
2667+
#[tokio::test(flavor = "multi_thread")]
2668+
async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
2669+
solana_logger::setup_with_default("info");
2670+
assert_solscan_slot_non_vote_counts(311_225_232, 888, 157).await;
2671+
}
2672+
2673+
#[cfg(test)]
2674+
#[serial]
2675+
#[tokio::test(flavor = "multi_thread")]
2676+
async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
2677+
solana_logger::setup_with_default("info");
2678+
assert_solscan_slot_non_vote_counts(311_175_860, 527, 110).await;
2679+
}
2680+
2681+
#[cfg(test)]
2682+
#[serial]
2683+
#[tokio::test(flavor = "multi_thread")]
2684+
async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
2685+
solana_logger::setup_with_default("info");
2686+
assert_solscan_slot_non_vote_counts(311_134_608, 1_086, 169).await;
2687+
}
2688+
25722689
#[tokio::test(flavor = "multi_thread")]
25732690
async fn test_firehose_epoch_850_has_logs() {
25742691
use std::sync::atomic::{AtomicU64, Ordering};

0 commit comments

Comments
 (0)