Skip to content

Commit adc8dd3

Browse files
authored
Merge pull request #35 from jamescorbett/bug/retry-missing-ranges
fix incorrect seek in retry causing gaps in data.
2 parents 89e4991 + dfccb1c commit adc8dd3

File tree

1 file changed

+27
-7
lines changed

1 file changed

+27
-7
lines changed

jetstreamer-firehose/src/firehose.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,14 @@ where
900900
let nodes = match timeout(OP_TIMEOUT, read_fut).await {
901901
Ok(result) => result
902902
.map_err(FirehoseError::ReadUntilBlockError)
903-
.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
903+
.map_err(|e| {
904+
(
905+
e,
906+
current_slot
907+
.map(|slot| slot.saturating_add(1))
908+
.unwrap_or(slot_range.start),
909+
)
910+
})?,
904911
Err(_) => {
905912
log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
906913
return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
@@ -1518,8 +1525,11 @@ where
15181525
epoch
15191526
);
15201527
log::error!(target: &log_target, "{}", error_message);
1521-
if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
1522-
// Clear cached index data for this epoch to avoid retrying with a bad/partial index.
1528+
if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1529+
|| error_message.contains("Unknown CID version")
1530+
{
1531+
// Clear cached index data for this epoch to avoid retrying with a bad/partial index
1532+
// (or a bad seek offset that landed mid-stream).
15231533
SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
15241534
}
15251535
if let Some(on_error_cb) = on_error.clone() {
@@ -1564,7 +1574,10 @@ where
15641574
if block_enabled {
15651575
pending_skipped_slots.remove(&thread_index);
15661576
}
1567-
skip_until_index = Some(item_index);
1577+
// `skip_until_index` is unsafe across retries because `item_index`
1578+
// is reset to 0 each epoch restart. Keeping it can skip large portions
1579+
// of the stream and silently drop slots.
1580+
skip_until_index = None;
15681581
last_emitted_slot_global = last_emitted_slot;
15691582
}
15701583
});
@@ -2144,8 +2157,12 @@ async fn firehose_geyser_thread(
21442157
slot_to_epoch(slot)
21452158
);
21462159
log::error!(target: &log_target, "{}", err);
2147-
if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
2148-
// Clear cached index data for this epoch to avoid retrying with a bad/partial index.
2160+
let error_message = err.to_string();
2161+
if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2162+
|| error_message.contains("Unknown CID version")
2163+
{
2164+
// Clear cached index data for this epoch to avoid retrying with a bad/partial index
2165+
// (or a bad seek offset that landed mid-stream).
21492166
SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
21502167
}
21512168
let item_index = match err {
@@ -2168,7 +2185,10 @@ async fn firehose_geyser_thread(
21682185
} else {
21692186
slot_range.start = slot;
21702187
}
2171-
skip_until_index = Some(item_index);
2188+
// `skip_until_index` is unsafe across retries because `item_index`
2189+
// is reset to 0 each epoch restart. Keeping it can skip large portions
2190+
// of the stream and silently drop slots.
2191+
skip_until_index = None;
21722192
}
21732193
Ok(())
21742194
}

0 commit comments

Comments
 (0)