Skip to content

Commit 7477097

Browse files
committed
possible fix
1 parent a80e55a commit 7477097

File tree

1 file changed

+234
-15
lines changed

1 file changed

+234
-15
lines changed

jetstreamer-firehose/src/firehose.rs

Lines changed: 234 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use tokio::{
3737
use crate::{
3838
LOG_MODULE, SharedError,
3939
epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
40-
index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
40+
index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
4141
node_reader::NodeReader,
4242
utils,
4343
};
@@ -85,6 +85,49 @@ fn is_shutdown_error(err: &FirehoseError) -> bool {
8585
}
8686
}
8787

88+
async fn find_previous_indexed_slot(
89+
local_start: u64,
90+
epoch_start: u64,
91+
log_target: &str,
92+
) -> Result<Option<u64>, FirehoseError> {
93+
if local_start <= epoch_start {
94+
return Ok(None);
95+
}
96+
let mut candidate = local_start.saturating_sub(1);
97+
let mut skipped = 0u64;
98+
loop {
99+
match slot_to_offset(candidate).await {
100+
Ok(_) => {
101+
if skipped > 0 {
102+
log::info!(
103+
target: log_target,
104+
"slot {} missing in index; seeking back {} slots to {}",
105+
local_start.saturating_sub(1),
106+
skipped,
107+
candidate
108+
);
109+
}
110+
return Ok(Some(candidate));
111+
}
112+
Err(SlotOffsetIndexError::SlotNotFound(..)) => {
113+
if candidate <= epoch_start {
114+
break;
115+
}
116+
skipped += 1;
117+
candidate = candidate.saturating_sub(1);
118+
}
119+
Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
120+
}
121+
}
122+
log::warn!(
123+
target: log_target,
124+
"no indexed slot found before {} (epoch start {}); reading from epoch start",
125+
local_start,
126+
epoch_start
127+
);
128+
Ok(None)
129+
}
130+
88131
/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
89132
/// the firehose.
90133
#[derive(Debug, Error)]
@@ -871,15 +914,36 @@ where
871914
}
872915

873916
if local_start > epoch_start {
874-
// Seek to the previous slot so the stream includes all nodes
875-
// (transactions, entries, rewards) that precede the block payload for
876-
// `local_start`.
877-
let seek_slot = local_start.saturating_sub(1);
878-
let seek_fut = reader.seek_to_slot(seek_slot);
879-
match timeout(OP_TIMEOUT, seek_fut).await {
917+
// Seek to the nearest previous indexed slot so the stream includes all
918+
// nodes (transactions, entries, rewards) that precede `local_start`.
919+
let seek_slot = match timeout(
920+
OP_TIMEOUT,
921+
find_previous_indexed_slot(local_start, epoch_start, &log_target),
922+
)
923+
.await
924+
{
880925
Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
881926
Err(_) => {
882-
return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
927+
return Err((
928+
FirehoseError::OperationTimeout(
929+
"seek_to_previous_indexed_slot",
930+
),
931+
current_slot.unwrap_or(slot_range.start),
932+
));
933+
}
934+
};
935+
if let Some(seek_slot) = seek_slot {
936+
let seek_fut = reader.seek_to_slot(seek_slot);
937+
match timeout(OP_TIMEOUT, seek_fut).await {
938+
Ok(res) => {
939+
res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
940+
}
941+
Err(_) => {
942+
return Err((
943+
FirehoseError::OperationTimeout("seek_to_slot"),
944+
current_slot.unwrap_or(slot_range.start),
945+
));
946+
}
883947
}
884948
}
885949
}
@@ -1835,15 +1899,36 @@ async fn firehose_geyser_thread(
18351899
current_slot = None;
18361900

18371901
if local_start > epoch_start {
1838-
// Seek to the slot immediately preceding the requested range so the reader
1839-
// captures the full node set (transactions, entries, rewards) for the
1840-
// target block on the next iteration.
1841-
let seek_slot = local_start.saturating_sub(1);
1842-
let seek_fut = reader.seek_to_slot(seek_slot);
1843-
match timeout(OP_TIMEOUT, seek_fut).await {
1902+
// Seek to the nearest previous indexed slot so the reader captures the full
1903+
// node set (transactions, entries, rewards) for the target block.
1904+
let seek_slot = match timeout(
1905+
OP_TIMEOUT,
1906+
find_previous_indexed_slot(local_start, epoch_start, &log_target),
1907+
)
1908+
.await
1909+
{
18441910
Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
18451911
Err(_) => {
1846-
return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1912+
return Err((
1913+
FirehoseError::OperationTimeout(
1914+
"seek_to_previous_indexed_slot",
1915+
),
1916+
current_slot.unwrap_or(slot_range.start),
1917+
));
1918+
}
1919+
};
1920+
if let Some(seek_slot) = seek_slot {
1921+
let seek_fut = reader.seek_to_slot(seek_slot);
1922+
match timeout(OP_TIMEOUT, seek_fut).await {
1923+
Ok(res) => {
1924+
res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1925+
}
1926+
Err(_) => {
1927+
return Err((
1928+
FirehoseError::OperationTimeout("seek_to_slot"),
1929+
current_slot.unwrap_or(slot_range.start),
1930+
));
1931+
}
18471932
}
18481933
}
18491934
}
@@ -2463,6 +2548,122 @@ async fn assert_solscan_slot_non_vote_counts(
24632548
);
24642549
}
24652550

2551+
#[cfg(test)]
2552+
async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2553+
use crate::node::Node;
2554+
use crate::index::slot_to_offset;
2555+
2556+
let epoch = slot_to_epoch(slot);
2557+
let client = crate::network::create_http_client();
2558+
let stream = fetch_epoch_stream(epoch, &client).await;
2559+
let mut reader = NodeReader::new(stream);
2560+
reader
2561+
.seek_to_slot(slot)
2562+
.await
2563+
.map_err(|err| Box::new(err) as SharedError)?;
2564+
2565+
let nodes = reader.read_until_block().await?;
2566+
let mut transactions = 0u64;
2567+
let mut entries = 0u64;
2568+
let mut entry_tx_total = 0u64;
2569+
let mut dataframes = 0u64;
2570+
let mut rewards = 0u64;
2571+
let mut subsets = 0u64;
2572+
let mut epochs = 0u64;
2573+
let mut block_slot = None;
2574+
let mut block_entries = None;
2575+
let first_kind = nodes
2576+
.0
2577+
.first()
2578+
.map(|node| node.get_node())
2579+
.map(|node| match node {
2580+
Node::Transaction(_) => "transaction",
2581+
Node::Entry(_) => "entry",
2582+
Node::Block(_) => "block",
2583+
Node::Subset(_) => "subset",
2584+
Node::Epoch(_) => "epoch",
2585+
Node::Rewards(_) => "rewards",
2586+
Node::DataFrame(_) => "dataframe",
2587+
})
2588+
.unwrap_or("none");
2589+
2590+
for node in &nodes.0 {
2591+
match node.get_node() {
2592+
Node::Transaction(_) => {
2593+
transactions += 1;
2594+
}
2595+
Node::Entry(entry) => {
2596+
entries += 1;
2597+
entry_tx_total += entry.transactions.len() as u64;
2598+
}
2599+
Node::Block(block) => {
2600+
block_slot = Some(block.slot);
2601+
block_entries = Some(block.entries.len());
2602+
}
2603+
Node::Subset(_) => {
2604+
subsets += 1;
2605+
}
2606+
Node::Epoch(_) => {
2607+
epochs += 1;
2608+
}
2609+
Node::Rewards(_) => {
2610+
rewards += 1;
2611+
}
2612+
Node::DataFrame(_) => {
2613+
dataframes += 1;
2614+
}
2615+
}
2616+
}
2617+
2618+
log::info!(
2619+
target: LOG_MODULE,
2620+
"slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2621+
nodes.len(),
2622+
first_kind,
2623+
transactions,
2624+
entries,
2625+
entry_tx_total,
2626+
block_slot,
2627+
block_entries,
2628+
dataframes,
2629+
rewards,
2630+
subsets,
2631+
epochs
2632+
);
2633+
2634+
if slot > 0 {
2635+
let mut found_previous = None;
2636+
for delta in 1..=5 {
2637+
let candidate = slot.saturating_sub(delta);
2638+
match slot_to_offset(candidate).await {
2639+
Ok(offset) => {
2640+
found_previous = Some((candidate, offset));
2641+
break;
2642+
}
2643+
Err(err) => {
2644+
log::info!(
2645+
target: LOG_MODULE,
2646+
"slot {slot} previous lookup {candidate} failed: {err}"
2647+
);
2648+
}
2649+
}
2650+
}
2651+
if let Some((candidate, offset)) = found_previous {
2652+
log::info!(
2653+
target: LOG_MODULE,
2654+
"slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2655+
);
2656+
} else {
2657+
log::info!(
2658+
target: LOG_MODULE,
2659+
"slot {slot} no previous offsets found within 5 slots"
2660+
);
2661+
}
2662+
}
2663+
2664+
Ok(())
2665+
}
2666+
24662667
#[tokio::test(flavor = "multi_thread")]
24672668
async fn test_firehose_epoch_800() {
24682669
use dashmap::DashSet;
@@ -2686,6 +2887,24 @@ async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
26862887
assert_solscan_slot_non_vote_counts(311_134_608, 1_086, 169).await;
26872888
}
26882889

2890+
#[cfg(test)]
2891+
#[ignore]
2892+
#[serial]
2893+
#[tokio::test(flavor = "multi_thread")]
2894+
async fn debug_epoch_720_slot_311173980_node_summary() {
2895+
solana_logger::setup_with_default("info");
2896+
const SLOTS: &[u64] = &[
2897+
311_173_980,
2898+
311_225_232,
2899+
311_175_860,
2900+
311_134_608,
2901+
376_273_722,
2902+
];
2903+
for slot in SLOTS {
2904+
log_slot_node_summary(*slot).await.expect("slot summary");
2905+
}
2906+
}
2907+
26892908
#[tokio::test(flavor = "multi_thread")]
26902909
async fn test_firehose_epoch_850_has_logs() {
26912910
use std::sync::atomic::{AtomicU64, Ordering};

0 commit comments

Comments
 (0)