Skip to content

Commit 91429b8

Browse files
committed
Refactor FSM backoff handling: replace fetch attempts with scheduled timer for retries
1 parent b0578be commit 91429b8

File tree

13 files changed

+784
-76
lines changed

13 files changed

+784
-76
lines changed

crates/pavis/src/admin.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,33 @@ use std::sync::Arc;
77
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
88
use tokio::net::TcpListener;
99
use tokio::sync::watch;
10-
use tokio::time::Instant;
10+
use tokio::time::{Duration, Instant, timeout};
1111

1212
use crate::state::RuntimeStateHandle;
1313

14+
const ADMIN_REQUEST_LINE_LIMIT_BYTES: usize = 4096;
15+
const ADMIN_READ_TIMEOUT: Duration = Duration::from_secs(5);
16+
17+
async fn read_request_line(
18+
reader: &mut BufReader<tokio::net::TcpStream>,
19+
) -> std::io::Result<Option<String>> {
20+
let mut buf = Vec::new();
21+
let read = timeout(ADMIN_READ_TIMEOUT, reader.read_until(b'\n', &mut buf))
22+
.await
23+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "admin read timeout"))??;
24+
if read == 0 {
25+
return Ok(None);
26+
}
27+
if buf.len() > ADMIN_REQUEST_LINE_LIMIT_BYTES {
28+
return Err(std::io::Error::new(
29+
std::io::ErrorKind::InvalidData,
30+
"admin request line too long",
31+
));
32+
}
33+
let line = String::from_utf8_lossy(&buf);
34+
Ok(Some(line.trim_end_matches(['\r', '\n']).to_string()))
35+
}
36+
1437
/// Admin API worker service.
1538
///
1639
/// Provides read-only operational endpoints:
@@ -115,11 +138,9 @@ impl AdminApiWorker {
115138
match accept_result {
116139
Ok((stream, _peer_addr)) => {
117140
let mut reader = BufReader::new(stream);
118-
let mut request_line = String::new();
119-
120-
match reader.read_line(&mut request_line).await {
121-
Ok(0) => continue, // Connection closed
122-
Ok(_) => {
141+
match read_request_line(&mut reader).await {
142+
Ok(None) => continue, // Connection closed
143+
Ok(Some(request_line)) => {
123144
// Parse request line: "GET /path HTTP/1.1"
124145
let parts: Vec<&str> = request_line.split_whitespace().collect();
125146
if parts.len() >= 2 {

crates/pavis/src/agent/fsm.rs

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -391,16 +391,10 @@ impl Fsm {
391391
}
392392
(State::Verifying(_), Event::VerifyFail { etag, now }) => {
393393
self.ctx.set_rejected(etag, now);
394-
self.ctx.backoff_attempt = 0;
395-
self.state = State::Fetching;
396-
if let Some(etag) = self.ctx.conditional_etag(now) {
397-
effects.push(Effect::FetchConditional {
398-
wait_ms: WAIT_MS,
399-
etag,
400-
});
401-
} else {
402-
effects.push(Effect::FetchUnconditional { wait_ms: WAIT_MS });
403-
}
394+
let delay = backoff_delay(self.ctx.backoff_attempt);
395+
self.ctx.backoff_attempt = self.ctx.backoff_attempt.saturating_add(1);
396+
self.state = State::BackoffSleeping;
397+
effects.push(Effect::ScheduleTimer { duration: delay });
404398
}
405399
(State::Verifying(_), Event::Shutdown) => {
406400
self.state = State::Stopped;
@@ -428,16 +422,10 @@ impl Fsm {
428422
}
429423
(State::Applying(_), Event::ApplyFail { etag, now }) => {
430424
self.ctx.set_rejected(etag, now);
431-
self.ctx.backoff_attempt = 0;
432-
self.state = State::Fetching;
433-
if let Some(etag) = self.ctx.conditional_etag(now) {
434-
effects.push(Effect::FetchConditional {
435-
wait_ms: WAIT_MS,
436-
etag,
437-
});
438-
} else {
439-
effects.push(Effect::FetchUnconditional { wait_ms: WAIT_MS });
440-
}
425+
let delay = backoff_delay(self.ctx.backoff_attempt);
426+
self.ctx.backoff_attempt = self.ctx.backoff_attempt.saturating_add(1);
427+
self.state = State::BackoffSleeping;
428+
effects.push(Effect::ScheduleTimer { duration: delay });
441429
}
442430
(State::Applying(_), Event::Shutdown) => {
443431
self.state = State::Stopped;
@@ -632,10 +620,8 @@ mod tests {
632620
fsm.context().last_rejected_etag.as_deref(),
633621
Some("sha256:bad")
634622
);
635-
assert!(effects.iter().any(|effect| matches!(
636-
effect,
637-
Effect::FetchConditional { .. } | Effect::FetchUnconditional { .. }
638-
)));
623+
assert!(matches!(fsm.state, State::BackoffSleeping));
624+
assert!(matches!(effects.as_slice(), [Effect::ScheduleTimer { .. }]));
639625
}
640626

641627
#[test]
@@ -656,10 +642,8 @@ mod tests {
656642
fsm.context().last_rejected_etag.as_deref(),
657643
Some("sha256:bad")
658644
);
659-
assert!(effects.iter().any(|effect| matches!(
660-
effect,
661-
Effect::FetchConditional { .. } | Effect::FetchUnconditional { .. }
662-
)));
645+
assert!(matches!(fsm.state, State::BackoffSleeping));
646+
assert!(matches!(effects.as_slice(), [Effect::ScheduleTimer { .. }]));
663647
}
664648

665649
#[test]

crates/pavis/src/proxy/service/io.rs

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use pingora::http::ResponseHeader;
1212
use pingora::prelude::*;
1313
use pingora::protocols::Digest;
1414
use pingora::proxy::{ProxyHttp, Session};
15+
use std::collections::HashSet;
1516
use std::sync::Arc;
17+
use std::sync::Mutex;
18+
use std::sync::OnceLock;
1619
use std::sync::atomic::Ordering;
1720
use std::time::Duration;
1821
use tracing_opentelemetry::OpenTelemetrySpanExt;
@@ -27,6 +30,20 @@ use super::state::Proxy;
2730
static POOL_KEY_LOG_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2831
static SNI_FRAGMENT_WARN_COUNTER: std::sync::atomic::AtomicU64 =
2932
std::sync::atomic::AtomicU64::new(0);
33+
static LOGGED_UPSTREAM_CHECK_COUNTER: std::sync::atomic::AtomicU64 =
34+
std::sync::atomic::AtomicU64::new(0);
35+
static LOGGED_UPSTREAM_KEYS: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
36+
37+
fn should_log_upstream_config(hash: u64) -> bool {
38+
if LOGGED_UPSTREAM_CHECK_COUNTER.fetch_add(1, Ordering::Relaxed) % 256 != 0 {
39+
return false;
40+
}
41+
let set = LOGGED_UPSTREAM_KEYS.get_or_init(|| Mutex::new(HashSet::new()));
42+
if let Ok(mut guard) = set.try_lock() {
43+
return guard.insert(hash);
44+
}
45+
false
46+
}
3047

3148
#[async_trait]
3249
impl ProxyHttp for Proxy {
@@ -363,15 +380,11 @@ impl ProxyHttp for Proxy {
363380
.map(|d| d.as_millis() as u64)
364381
.unwrap_or(60000);
365382

366-
route_phase.set_retry_context(RetryContext::new(
367-
retry_policy.clone(),
368-
request_timeout_ms,
369-
self.telemetry.metrics.clone(),
370-
upstream_name.clone(),
371-
));
372-
373383
let mut body_bytes = Vec::new();
374384
let limit = *max_request_body_buffer_bytes;
385+
let mut budget_guard = crate::retry::BudgetGuard::new();
386+
let mut budget_exhausted = false;
387+
let mut truncated = false;
375388

376389
while let Some(chunk) = session.read_request_body().await? {
377390
if (body_bytes.len() as u64) + (chunk.len() as u64) > limit {
@@ -380,19 +393,39 @@ impl ProxyHttp for Proxy {
380393
upstream = %upstream_name,
381394
"Request body exceeds buffer limit, marking as non-replayable"
382395
);
383-
body_bytes.extend_from_slice(&chunk);
396+
truncated = true;
397+
break;
398+
}
399+
if !budget_guard.try_add(chunk.len() as u64) {
400+
budget_exhausted = true;
384401
break;
385402
}
386403
body_bytes.extend_from_slice(&chunk);
387404
}
388405

389-
let buffered = crate::retry::BufferedBody::new(
390-
body_bytes,
391-
limit,
392-
self.telemetry.metrics.clone(),
393-
&upstream_name,
394-
);
395-
route_phase.set_buffered_body(buffered);
406+
if budget_exhausted {
407+
tracing::warn!(
408+
upstream = %upstream_name,
409+
"Retry body buffer budget exhausted; disabling retries for request"
410+
);
411+
} else {
412+
route_phase.set_retry_context(RetryContext::new(
413+
retry_policy.clone(),
414+
request_timeout_ms,
415+
self.telemetry.metrics.clone(),
416+
upstream_name.clone(),
417+
));
418+
419+
let buffered = crate::retry::BufferedBody::new(
420+
body_bytes,
421+
limit,
422+
self.telemetry.metrics.clone(),
423+
&upstream_name,
424+
truncated,
425+
Some(budget_guard),
426+
);
427+
route_phase.set_buffered_body(buffered);
428+
}
396429
}
397430

398431
if let RouteAction::Forward(destinations) = &route.action {
@@ -996,8 +1029,7 @@ impl<'a> UpstreamPeerBuilder<'a> {
9961029
peer.options.tcp_recv_buf = Some(buffer_size as usize);
9971030
}
9981031

999-
// Log effective upstream configuration (once per upstream, using atomic counter to avoid Mutex)
1000-
static LOGGED_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1032+
// Log effective upstream configuration (rate-limited with a sampled, per-upstream set)
10011033
let log_key = format!("{}:{}", upstream_name.0, cluster as *const _ as usize);
10021034
let hash = {
10031035
use std::collections::hash_map::DefaultHasher;
@@ -1007,8 +1039,7 @@ impl<'a> UpstreamPeerBuilder<'a> {
10071039
hasher.finish()
10081040
};
10091041

1010-
// Use a simple probabilistic approach: only log if this is likely the first time
1011-
if LOGGED_COUNTER.fetch_add(1, Ordering::Relaxed) % 1000 < 10 || hash % 1000 == 0 {
1042+
if should_log_upstream_config(hash) {
10121043
tracing::info!(
10131044
upstream = %upstream_name.0,
10141045
idle_timeout_ms = ?peer.options.idle_timeout.map(|d| d.as_millis()),

crates/pavis/src/retry.rs

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,53 @@
1010
use crate::telemetry::metrics::MetricsRegistry;
1111
use pavis_core::{BackoffStrategy, BodyReplayability, MethodIdempotency, RetryPolicy, RetryReason};
1212
use std::sync::Arc;
13+
use std::sync::atomic::{AtomicU64, Ordering};
1314
use std::time::{Duration, Instant};
1415
use tokio::time::sleep;
1516
use tracing::{debug, warn};
1617

18+
const RETRY_BODY_GLOBAL_BUDGET_BYTES: u64 = 32 * 1024 * 1024;
19+
static RETRY_BODY_BUDGET_USED_BYTES: AtomicU64 = AtomicU64::new(0);
20+
21+
#[derive(Debug)]
22+
pub struct BudgetGuard {
23+
bytes: u64,
24+
}
25+
26+
impl BudgetGuard {
27+
pub fn new() -> Self {
28+
Self { bytes: 0 }
29+
}
30+
31+
pub fn try_add(&mut self, extra: u64) -> bool {
32+
if extra == 0 {
33+
return true;
34+
}
35+
loop {
36+
let used = RETRY_BODY_BUDGET_USED_BYTES.load(Ordering::Relaxed);
37+
let new_used = used.saturating_add(extra);
38+
if new_used > RETRY_BODY_GLOBAL_BUDGET_BYTES {
39+
return false;
40+
}
41+
if RETRY_BODY_BUDGET_USED_BYTES
42+
.compare_exchange(used, new_used, Ordering::SeqCst, Ordering::Relaxed)
43+
.is_ok()
44+
{
45+
self.bytes = self.bytes.saturating_add(extra);
46+
return true;
47+
}
48+
}
49+
}
50+
}
51+
52+
impl Drop for BudgetGuard {
53+
fn drop(&mut self) {
54+
if self.bytes > 0 {
55+
RETRY_BODY_BUDGET_USED_BYTES.fetch_sub(self.bytes, Ordering::SeqCst);
56+
}
57+
}
58+
}
59+
1760
/// Retry context tracking state across retry attempts
1861
pub struct RetryContext {
1962
/// Global deadline for entire request (including all retries)
@@ -206,6 +249,8 @@ pub struct BufferedBody {
206249
pub bytes: Vec<u8>,
207250
/// Replayability status
208251
pub replayability: BodyReplayability,
252+
/// Global buffer budget guard (releases on drop)
253+
_budget_guard: Option<BudgetGuard>,
209254
}
210255

211256
impl BufferedBody {
@@ -215,8 +260,12 @@ impl BufferedBody {
215260
max_buffer_size: u64,
216261
metrics: Option<Arc<MetricsRegistry>>,
217262
upstream_name: &str,
263+
force_streaming: bool,
264+
budget_guard: Option<BudgetGuard>,
218265
) -> Self {
219-
let replayability = if bytes.is_empty() {
266+
let replayability = if force_streaming {
267+
BodyReplayability::Streaming
268+
} else if bytes.is_empty() {
220269
BodyReplayability::Empty
221270
} else if bytes.len() as u64 <= max_buffer_size {
222271
if let Some(metrics) = &metrics {
@@ -230,6 +279,7 @@ impl BufferedBody {
230279
Self {
231280
bytes,
232281
replayability,
282+
_budget_guard: budget_guard,
233283
}
234284
}
235285

@@ -408,36 +458,36 @@ mod tests {
408458

409459
#[test]
410460
fn buffered_body_empty() {
411-
let body = BufferedBody::new(vec![], 1024, None, "test");
461+
let body = BufferedBody::new(vec![], 1024, None, "test", false, None);
412462
assert_eq!(body.replayability, BodyReplayability::Empty);
413463
assert!(body.can_replay());
414464
}
415465

416466
#[test]
417467
fn buffered_body_buffered() {
418-
let body = BufferedBody::new(vec![1, 2, 3], 1024, None, "test");
468+
let body = BufferedBody::new(vec![1, 2, 3], 1024, None, "test", false, None);
419469
assert_eq!(body.replayability, BodyReplayability::Buffered);
420470
assert!(body.can_replay());
421471
}
422472

423473
#[test]
424474
fn buffered_body_streaming() {
425475
let large_body = vec![0u8; 2048];
426-
let body = BufferedBody::new(large_body, 1024, None, "test");
476+
let body = BufferedBody::new(large_body, 1024, None, "test", false, None);
427477
assert_eq!(body.replayability, BodyReplayability::Streaming);
428478
assert!(!body.can_replay());
429479
}
430480

431481
#[test]
432482
fn buffered_body_handle_non_replayable_strict() {
433-
let body = BufferedBody::new(vec![0u8; 2048], 1024, None, "test");
483+
let body = BufferedBody::new(vec![0u8; 2048], 1024, None, "test", false, None);
434484
let result = body.handle_non_replayable(true);
435485
assert!(result.is_err());
436486
}
437487

438488
#[test]
439489
fn buffered_body_handle_non_replayable_lenient() {
440-
let body = BufferedBody::new(vec![0u8; 2048], 1024, None, "test");
490+
let body = BufferedBody::new(vec![0u8; 2048], 1024, None, "test", false, None);
441491
let result = body.handle_non_replayable(false);
442492
assert!(result.is_ok());
443493
}

0 commit comments

Comments
 (0)