diff --git a/dev-docs/PEGBOARD_TUNNEL_RETRIES.md b/dev-docs/PEGBOARD_TUNNEL_RETRIES.md index 132d721b70..0ee1babbde 100644 --- a/dev-docs/PEGBOARD_TUNNEL_RETRIES.md +++ b/dev-docs/PEGBOARD_TUNNEL_RETRIES.md @@ -7,7 +7,7 @@ This document explains how retries are coordinated between Guard and Pegboard-ba ## HTTP - Signal: A retryable transient tunnel failure is signaled by returning an HTTP 503 with the `X-RIVET-ERROR` header set. - - Example (Pegboard Gateway): on tunnel closed (e.g., UPS `no_responders` or `request_timeout`), the gateway replies with `503` and `X-RIVET-ERROR: pegboard_gateway.tunnel_closed`. + - Example (Pegboard Gateway): on tunnel closed (e.g., UPS `request_timeout`), the gateway replies with `503` and `X-RIVET-ERROR: pegboard_gateway.tunnel_closed`. - Guard behavior - Guard considers a response retryable if `status == 503` and the `X-RIVET-ERROR` header is present. @@ -37,7 +37,7 @@ This section explains how WebSocket retries are coordinated between Guard and Pe - Handler contract: - Do not await the client websocket yet. - Return the untouched `HyperWebsocket` in the error tuple so Guard still owns it: `Err((client_ws, err))`. - - The outer wrapper maps tunnel-closed UPS errors (e.g., `ups.no_responders`, `ups.request_timeout`) to `WebSocketServiceUnavailable`. + - The outer wrapper maps tunnel-closed UPS errors (e.g., `ups.request_timeout`) to `WebSocketServiceUnavailable`. - Guard reaction: - Treats `WebSocketServiceUnavailable` as retryable. - Re-resolves the route with ignore-cache=true, using middleware-config retry/backoff. @@ -65,7 +65,7 @@ This section explains how WebSocket retries are coordinated between Guard and Pe - Return the socket in the error tuple: `Err((client_ws, err))`. - Map tunnel-closed errors at the wrapper: - - In the outer `handle_websocket` wrapper, detect tunnel-closed (e.g., `ups.no_responders`, `ups.request_timeout`) and map to `WebSocketServiceUnavailable`. + - In the outer `handle_websocket` wrapper, detect tunnel-closed (e.g., `ups.request_timeout`) and map to `WebSocketServiceUnavailable`. - `handle_websocket_inner` should return raw errors; do not construct `WebSocketServiceUnavailable` inside the inner function. - Use `ups.request` for all tunnel operations (open, messages, close): diff --git a/out/errors/ups.no_responders.json b/out/errors/ups.no_responders.json deleted file mode 100644 index 534dfb6f26..0000000000 --- a/out/errors/ups.no_responders.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "no_responders", - "group": "ups", - "message": "No responders." -} \ No newline at end of file diff --git a/out/errors/ups.no_respondersraCompletionMarker.json b/out/errors/ups.no_respondersraCompletionMarker.json deleted file mode 100644 index 941530f3b5..0000000000 --- a/out/errors/ups.no_respondersraCompletionMarker.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "no_respondersraCompletionMarker", - "group": "ups", - "message": "No subscribers." -} \ No newline at end of file diff --git a/packages/common/universalpubsub/src/driver/memory/mod.rs b/packages/common/universalpubsub/src/driver/memory/mod.rs index 20686ebe16..3e7ee8ca20 100644 --- a/packages/common/universalpubsub/src/driver/memory/mod.rs +++ b/packages/common/universalpubsub/src/driver/memory/mod.rs @@ -4,7 +4,7 @@ use std::time::Duration; use anyhow::*; use async_trait::async_trait; -use tokio::sync::{RwLock, mpsc}; +use tokio::sync::{mpsc, RwLock}; use uuid::Uuid; use crate::driver::{PubSubDriver, SubscriberDriver, SubscriberDriverHandle}; @@ -93,7 +93,10 @@ impl PubSubDriver for MemoryDriver { .get(&subject_with_channel) .map_or(true, |subs| subs.is_empty()) { - return Err(crate::errors::Ups::NoResponders.build().into()); + // HACK: There is no native NoResponders error, so we return + // RequestTimeout. This is equivalent since the request would time out + // if there are no responders. + return Err(crate::errors::Ups::RequestTimeout.build().into()); } } diff --git a/packages/common/universalpubsub/src/driver/nats/mod.rs b/packages/common/universalpubsub/src/driver/nats/mod.rs index 4c96e20b8b..223e50d7ae 100644 --- a/packages/common/universalpubsub/src/driver/nats/mod.rs +++ b/packages/common/universalpubsub/src/driver/nats/mod.rs @@ -67,7 +67,10 @@ impl PubSubDriver for NatsDriver { std::result::Result::Ok(msg) => msg, std::result::Result::Err(err) => match err.kind() { RequestErrorKind::NoResponders => { - return Err(errors::Ups::NoResponders.build().into()); + // HACK: There is no native NoResponders error, so we return + // RequestTimeout. This is equivalent since the request would time out + // if there are no responders. + return Err(errors::Ups::RequestTimeout.build().into()); } RequestErrorKind::TimedOut => { return Err(errors::Ups::RequestTimeout.build().into()); @@ -84,7 +87,8 @@ impl PubSubDriver for NatsDriver { std::result::Result::Ok(msg) => msg, std::result::Result::Err(err) => match err.kind() { RequestErrorKind::NoResponders => { - return Err(errors::Ups::NoResponders.build().into()); + // HACK: See above + return Err(errors::Ups::RequestTimeout.build().into()); } RequestErrorKind::TimedOut => { return Err(errors::Ups::RequestTimeout.build().into()); diff --git a/packages/common/universalpubsub/src/driver/postgres/mod.rs b/packages/common/universalpubsub/src/driver/postgres/mod.rs index f43143405c..10af3cacaa 100644 --- a/packages/common/universalpubsub/src/driver/postgres/mod.rs +++ b/packages/common/universalpubsub/src/driver/postgres/mod.rs @@ -11,7 +11,6 @@ use deadpool_postgres::{Config, ManagerConfig, Pool, PoolConfig, RecyclingMethod use futures_util::future::poll_fn; use moka::future::Cache; use serde::{Deserialize, Serialize}; -use sha2::{Digest, Sha256}; use tokio::sync::RwLock; use tokio_postgres::{AsyncMessage, NoTls}; use tracing::Instrument; @@ -167,10 +166,6 @@ impl PubSubDriver for PostgresDriver { None }; - // Get the lock ID for this subject - let lock_id = subject_to_lock_id(subject); - tracing::debug!(%subject, ?lock_id, "calculated advisory lock id"); - // Convert subject to base64 hash string because Postgres identifiers can only be 63 bytes let mut hasher = DefaultHasher::new(); subject.hash(&mut hasher); @@ -189,30 +184,14 @@ impl PubSubDriver for PostgresDriver { .tx .subscribe(); - let lock_sql = format!("SELECT pg_try_advisory_lock_shared({})", lock_id); - let lock_res = self.client.query_one(&lock_sql, &[]).await?; - let lock_acquired = lock_res.get::<_, bool>(0); - ensure!(lock_acquired, "Failed to acquire advisory lock for subject"); - let sql = format!("LISTEN {}", quote_ident(&subject_hash)); - let listen_res = self.client.batch_execute(&sql).await; - - if listen_res.is_err() { - // Release lock on error - let _ = self - .client - .execute("SELECT pg_advisory_unlock_shared($1)", &[&lock_id]) - .await; - } - - listen_res?; + self.client.batch_execute(&sql).await?; tracing::debug!(%subject, "subscription established successfully"); Ok(Box::new(PostgresSubscriber { driver: self.clone(), rx, local_request_rx, - lock_id, subject: subject.to_string(), })) } @@ -323,48 +302,8 @@ impl PubSubDriver for PostgresDriver { } } - // Normal path: check for listeners via database - tracing::debug!(%subject, "checking for remote listeners via database"); - // Get a connection from the pool for checking listeners - let conn = self - .pool - .get() - .await - .context("failed to get connection from pool")?; - - // First check if there are any listeners for this subject - let lock_id = subject_to_lock_id(subject); - - // Check if there are any shared advisory locks (listeners) for this subject - // Query pg_locks directly to avoid lock acquisition overhead - // Split the 64-bit lock_id into two 32-bit integers for pg_locks query - let classid = (lock_id >> 32) as i32; - let objid = (lock_id & 0xFFFFFFFF) as i32; - - let check_sql = " - SELECT EXISTS ( - SELECT 1 FROM pg_locks - WHERE locktype = 'advisory' - AND classid = $1::int - AND objid = $2::int - AND mode = 'ShareLock' - ) AS has_listeners - "; - let row = conn.query_one(check_sql, &[&classid, &objid]).await?; - let has_listeners: bool = row.get(0); - tracing::debug!( - %subject, - ?has_listeners, - "checked for listeners in database" - ); - - if !has_listeners { - tracing::warn!(%subject, "no listeners found for subject"); - return Err(errors::Ups::NoResponders.build().into()); - } - - // Drop the pool connection before creating new dedicated connections - drop(conn); + // Normal path: use database for request/response + tracing::debug!(%subject, "using database path for request"); // Create a temporary reply subject and a dedicated listener connection let reply_subject = format!("_INBOX.{}", uuid::Uuid::new_v4()); @@ -489,13 +428,12 @@ pub struct PostgresSubscriber { driver: PostgresDriver, rx: tokio::sync::broadcast::Receiver<(Vec, Option)>, local_request_rx: Option>, - lock_id: i64, subject: String, } #[async_trait] impl SubscriberDriver for PostgresSubscriber { - #[tracing::instrument(skip(self), fields(subject = %self.subject, lock_id = %self.lock_id))] + #[tracing::instrument(skip(self), fields(subject = %self.subject))] async fn next(&mut self) -> Result { tracing::debug!("waiting for message"); @@ -543,7 +481,7 @@ impl SubscriberDriver for PostgresSubscriber { })) } std::result::Result::Err(_) => { - tracing::debug!(?self.subject, ?self.lock_id, "subscription closed"); + tracing::debug!(?self.subject, "subscription closed"); Ok(NextOutput::Unsubscribed) } @@ -574,14 +512,13 @@ impl SubscriberDriver for PostgresSubscriber { impl Drop for PostgresSubscriber { fn drop(&mut self) { - tracing::debug!(subject = %self.subject, ?self.lock_id, "dropping postgres subscriber"); + tracing::debug!(subject = %self.subject, "dropping postgres subscriber"); - let lock_id = self.lock_id; let driver = self.driver.clone(); let subject = self.subject.clone(); let has_local_rx = self.local_request_rx.is_some(); - // Spawn a task to release the lock + // Spawn a task to clean up tokio::spawn(async move { // Clean up local subscription registration if memory optimization is enabled if has_local_rx { @@ -610,11 +547,6 @@ impl Drop for PostgresSubscriber { } } } - - let _ = driver - .client - .execute("SELECT pg_advisory_unlock_shared($1)", &[&lock_id]) - .await; }); } } @@ -624,17 +556,3 @@ fn quote_ident(subject: &str) -> String { let escaped = subject.replace('"', "\"\""); format!("\"{}\"", escaped) } - -/// Convert a subject name to a PostgreSQL advisory lock ID -/// Uses SHA256 hash truncated to 63 bits to avoid collisions -fn subject_to_lock_id(subject: &str) -> i64 { - let mut hasher = Sha256::new(); - hasher.update(subject.as_bytes()); - let hash = hasher.finalize(); - - // Take first 8 bytes and convert to i64, using only 63 bits to avoid sign issues - let mut bytes = [0u8; 8]; - bytes.copy_from_slice(&hash[0..8]); - let hash_u64 = u64::from_be_bytes(bytes); - (hash_u64 & 0x7FFFFFFFFFFFFFFF) as i64 -} diff --git a/packages/common/universalpubsub/src/errors.rs b/packages/common/universalpubsub/src/errors.rs index 6766a87f75..afab64db4a 100644 --- a/packages/common/universalpubsub/src/errors.rs +++ b/packages/common/universalpubsub/src/errors.rs @@ -4,9 +4,6 @@ use serde::{Deserialize, Serialize}; #[derive(RivetError, Debug, Deserialize, Serialize)] #[error("ups")] pub enum Ups { - #[error("no_responders", "No responders.")] - NoResponders, - #[error("request_timeout", "Request timeout.")] RequestTimeout, } diff --git a/packages/common/universalpubsub/tests/integration.rs b/packages/common/universalpubsub/tests/integration.rs index 0883295c73..8541cfbbfe 100644 --- a/packages/common/universalpubsub/tests/integration.rs +++ b/packages/common/universalpubsub/tests/integration.rs @@ -137,10 +137,6 @@ async fn test_inner(pubsub: &PubSub) { let start = Instant::now(); test_request_timeout(&pubsub).await.unwrap(); tracing::info!(duration_ms = ?start.elapsed().as_millis(), "test_request_timeout completed"); - - let start = Instant::now(); - test_no_responders(&pubsub).await.unwrap(); - tracing::info!(duration_ms = ?start.elapsed().as_millis(), "test_no_responders completed"); } async fn test_basic_pub_sub(pubsub: &PubSub) -> Result<()> { @@ -332,24 +328,3 @@ async fn test_request_timeout(pubsub: &PubSub) -> Result<()> { Ok(()) } - -async fn test_no_responders(pubsub: &PubSub) -> Result<()> { - tracing::info!("testing no responders error"); - - let result = pubsub - .request("test.no_responders", b"no one listening") - .await; - assert!( - result.is_err(), - "Expected request to fail with no responders" - ); - - let err = result.err().unwrap(); - let err = err - .downcast_ref::() - .expect("expected errors::Ups"); - assert_eq!(err.group(), "ups"); - assert_eq!(err.code(), "no_responders"); - - Ok(()) -} diff --git a/packages/core/pegboard-gateway/src/lib.rs b/packages/core/pegboard-gateway/src/lib.rs index 32a507ea39..f7297a0087 100644 --- a/packages/core/pegboard-gateway/src/lib.rs +++ b/packages/core/pegboard-gateway/src/lib.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; use gas::prelude::*; use http_body_util::{BodyExt, Full}; -use hyper::{Request, Response, StatusCode, body::Incoming as BodyIncoming}; +use hyper::{body::Incoming as BodyIncoming, Request, Response, StatusCode}; use hyper_tungstenite::HyperWebsocket; use pegboard::pubsub_subjects::{ TunnelHttpResponseSubject, TunnelHttpRunnerSubject, TunnelHttpWebSocketSubject, @@ -16,22 +16,21 @@ use rivet_guard_core::{ request_context::RequestContext, }; use rivet_tunnel_protocol::{ - MessageBody, StreamFinishReason, ToServerRequestFinish, ToServerRequestStart, + versioned, MessageBody, StreamFinishReason, ToServerRequestFinish, ToServerRequestStart, ToServerWebSocketClose, ToServerWebSocketMessage, ToServerWebSocketOpen, TunnelMessage, - versioned, }; use rivet_util::serde::HashableMap; use std::result::Result::Ok as ResultOk; use std::{ collections::HashMap, sync::{ - Arc, atomic::{AtomicU64, Ordering}, + Arc, }, time::Duration, }; use tokio::{ - sync::{Mutex, oneshot}, + sync::{oneshot, Mutex}, time::timeout, }; use tokio_tungstenite::tungstenite::Message; @@ -508,7 +507,7 @@ impl PegboardGateway { fn is_tunnel_closed_error(err: &anyhow::Error) -> bool { if let Some(err) = err.chain().find_map(|x| x.downcast_ref::()) && err.group() == "ups" - && (err.code() == "no_responders" || err.code() == "request_timeout") + && err.code() == "request_timeout" { true } else { diff --git a/packages/core/pegboard-tunnel/src/lib.rs b/packages/core/pegboard-tunnel/src/lib.rs index 4a91f93d1b..1a4a5df1bb 100644 --- a/packages/core/pegboard-tunnel/src/lib.rs +++ b/packages/core/pegboard-tunnel/src/lib.rs @@ -9,10 +9,10 @@ use gas::prelude::*; use http_body_util::Full; use hyper::body::{Bytes, Incoming as BodyIncoming}; use hyper::{Request, Response, StatusCode}; -use hyper_tungstenite::tungstenite::Utf8Bytes as WsUtf8Bytes; -use hyper_tungstenite::tungstenite::protocol::frame::CloseFrame as WsCloseFrame; use hyper_tungstenite::tungstenite::protocol::frame::coding::CloseCode as WsCloseCode; -use hyper_tungstenite::{HyperWebsocket, tungstenite::Message as WsMessage}; +use hyper_tungstenite::tungstenite::protocol::frame::CloseFrame as WsCloseFrame; +use hyper_tungstenite::tungstenite::Utf8Bytes as WsUtf8Bytes; +use hyper_tungstenite::{tungstenite::Message as WsMessage, HyperWebsocket}; use pegboard::pubsub_subjects::{ TunnelHttpResponseSubject, TunnelHttpRunnerSubject, TunnelHttpWebSocketSubject, }; @@ -20,7 +20,7 @@ use rivet_guard_core::custom_serve::CustomServeTrait; use rivet_guard_core::proxy_service::ResponseBody; use rivet_guard_core::request_context::RequestContext; use rivet_pools::Pools; -use rivet_tunnel_protocol::{MessageBody, TunnelMessage, versioned}; +use rivet_tunnel_protocol::{versioned, MessageBody, TunnelMessage}; use rivet_util::Id; use std::net::SocketAddr; use tokio::net::TcpListener; @@ -690,7 +690,7 @@ fn is_tunnel_closed_error(err: &anyhow::Error) -> bool { .chain() .find_map(|x| x.downcast_ref::()) && err.group() == "ups" - && (err.code() == "no_responders" || err.code() == "request_timeout") + && err.code() == "request_timeout" { true } else {