Skip to content
This repository was archived by the owner on Feb 16, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/session/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{pin::Pin, time::Duration};
use async_stream::{stream, try_stream};
use futures::StreamExt;
use s2_api::v1::stream::{ReadEnd, ReadStart};
use tokio::time::timeout;
use tokio::time::{Instant, timeout};
use tracing::debug;

use crate::{
Expand Down Expand Up @@ -49,8 +49,11 @@ pub async fn read_session(
) -> Result<Streaming<ReadBatch>, ReadSessionError> {
let retry_builder = retry_builder(&client.config.retry);
let mut retry_backoffs = retry_builder.build();
let baseline_wait = end.wait;
let mut last_tail_at: Option<Instant> = None;

let batches = loop {
end.wait = remaining_wait(baseline_wait, last_tail_at);
match session_inner(
client.clone(),
name.clone(),
Expand Down Expand Up @@ -78,6 +81,7 @@ pub async fn read_session(

loop {
if batches.is_none() {
end.wait = remaining_wait(baseline_wait, last_tail_at);
match session_inner(
client.clone(),
name.clone(),
Expand Down Expand Up @@ -107,6 +111,10 @@ pub async fn read_session(
retry_backoffs.reset();
}

if batch.tail.is_some() {
last_tail_at = Some(Instant::now());
}

if let Some(record) = batch.records.last() {
start = ReadStart {
seq_num: Some(record.seq_num + 1),
Expand Down Expand Up @@ -164,6 +172,19 @@ async fn session_inner(
}))
}

/// Compute the remaining wait budget for a retry.
///
/// During catchup (tail not yet observed), the full wait is sent.
/// Once tailing, the wait budget is depleted based on time since
/// the last batch with tail info, which approximates how long the
/// server has been in its long polling state.
fn remaining_wait(baseline_wait: Option<u32>, last_tail_at: Option<Instant>) -> Option<u32> {
baseline_wait.map(|w| match last_tail_at {
Some(since) => w.saturating_sub(since.elapsed().as_secs() as u32),
None => w,
})
}

async fn can_retry(err: &ReadSessionError, backoffs: &mut RetryBackoff) -> bool {
if err.is_retryable()
&& let Some(backoff) = backoffs.next()
Expand Down
Loading