Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dev-docs/PEGBOARD_TUNNEL_RETRIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This document explains how retries are coordinated between Guard and Pegboard-ba
## HTTP

- Signal: A retryable transient tunnel failure is signaled by returning an HTTP 503 with the `X-RIVET-ERROR` header set.
- Example (Pegboard Gateway): on tunnel closed (e.g., UPS `no_responders` or `request_timeout`), the gateway replies with `503` and `X-RIVET-ERROR: pegboard_gateway.tunnel_closed`.
- Example (Pegboard Gateway): on tunnel closed (e.g., UPS `request_timeout`), the gateway replies with `503` and `X-RIVET-ERROR: pegboard_gateway.tunnel_closed`.

- Guard behavior
- Guard considers a response retryable if `status == 503` and the `X-RIVET-ERROR` header is present.
Expand Down Expand Up @@ -37,7 +37,7 @@ This section explains how WebSocket retries are coordinated between Guard and Pe
- Handler contract:
- Do not await the client websocket yet.
- Return the untouched `HyperWebsocket` in the error tuple so Guard still owns it: `Err((client_ws, err))`.
- The outer wrapper maps tunnel-closed UPS errors (e.g., `ups.no_responders`, `ups.request_timeout`) to `WebSocketServiceUnavailable`.
- The outer wrapper maps tunnel-closed UPS errors (e.g., `ups.request_timeout`) to `WebSocketServiceUnavailable`.
- Guard reaction:
- Treats `WebSocketServiceUnavailable` as retryable.
- Re-resolves the route with ignore-cache=true, using middleware-config retry/backoff.
Expand Down Expand Up @@ -65,7 +65,7 @@ This section explains how WebSocket retries are coordinated between Guard and Pe
- Return the socket in the error tuple: `Err((client_ws, err))`.

- Map tunnel-closed errors at the wrapper:
- In the outer `handle_websocket` wrapper, detect tunnel-closed (e.g., `ups.no_responders`, `ups.request_timeout`) and map to `WebSocketServiceUnavailable`.
- In the outer `handle_websocket` wrapper, detect tunnel-closed (e.g., `ups.request_timeout`) and map to `WebSocketServiceUnavailable`.
- `handle_websocket_inner` should return raw errors; do not construct `WebSocketServiceUnavailable` inside the inner function.

- Use `ups.request` for all tunnel operations (open, messages, close):
Expand Down
5 changes: 0 additions & 5 deletions out/errors/ups.no_responders.json

This file was deleted.

5 changes: 0 additions & 5 deletions out/errors/ups.no_respondersraCompletionMarker.json

This file was deleted.

7 changes: 5 additions & 2 deletions packages/common/universalpubsub/src/driver/memory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

Check warning on line 4 in packages/common/universalpubsub/src/driver/memory/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/universalpubsub/src/driver/memory/mod.rs
use anyhow::*;
use async_trait::async_trait;
use tokio::sync::{RwLock, mpsc};
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;

use crate::driver::{PubSubDriver, SubscriberDriver, SubscriberDriverHandle};
Expand Down Expand Up @@ -93,7 +93,10 @@
.get(&subject_with_channel)
.map_or(true, |subs| subs.is_empty())
{
return Err(crate::errors::Ups::NoResponders.build().into());
// HACK: There is no native NoResponders error, so we return
// RequestTimeout. This is equivalent since the request would time out
// if there are no responders.
return Err(crate::errors::Ups::RequestTimeout.build().into());
}
}

Expand Down
8 changes: 6 additions & 2 deletions packages/common/universalpubsub/src/driver/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ impl PubSubDriver for NatsDriver {
std::result::Result::Ok(msg) => msg,
std::result::Result::Err(err) => match err.kind() {
RequestErrorKind::NoResponders => {
return Err(errors::Ups::NoResponders.build().into());
// HACK: There is no native NoResponders error, so we return
// RequestTimeout. This is equivalent since the request would time out
// if there are no responders.
return Err(errors::Ups::RequestTimeout.build().into());
}
RequestErrorKind::TimedOut => {
return Err(errors::Ups::RequestTimeout.build().into());
Expand All @@ -84,7 +87,8 @@ impl PubSubDriver for NatsDriver {
std::result::Result::Ok(msg) => msg,
std::result::Result::Err(err) => match err.kind() {
RequestErrorKind::NoResponders => {
return Err(errors::Ups::NoResponders.build().into());
// HACK: See above
return Err(errors::Ups::RequestTimeout.build().into());
}
RequestErrorKind::TimedOut => {
return Err(errors::Ups::RequestTimeout.build().into());
Expand Down
96 changes: 7 additions & 89 deletions packages/common/universalpubsub/src/driver/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use deadpool_postgres::{Config, ManagerConfig, Pool, PoolConfig, RecyclingMethod
use futures_util::future::poll_fn;
use moka::future::Cache;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use tokio::sync::RwLock;
use tokio_postgres::{AsyncMessage, NoTls};
use tracing::Instrument;
Expand Down Expand Up @@ -167,10 +166,6 @@ impl PubSubDriver for PostgresDriver {
None
};

// Get the lock ID for this subject
let lock_id = subject_to_lock_id(subject);
tracing::debug!(%subject, ?lock_id, "calculated advisory lock id");

// Convert subject to base64 hash string because Postgres identifiers can only be 63 bytes
let mut hasher = DefaultHasher::new();
subject.hash(&mut hasher);
Expand All @@ -189,30 +184,14 @@ impl PubSubDriver for PostgresDriver {
.tx
.subscribe();

let lock_sql = format!("SELECT pg_try_advisory_lock_shared({})", lock_id);
let lock_res = self.client.query_one(&lock_sql, &[]).await?;
let lock_acquired = lock_res.get::<_, bool>(0);
ensure!(lock_acquired, "Failed to acquire advisory lock for subject");

let sql = format!("LISTEN {}", quote_ident(&subject_hash));
let listen_res = self.client.batch_execute(&sql).await;

if listen_res.is_err() {
// Release lock on error
let _ = self
.client
.execute("SELECT pg_advisory_unlock_shared($1)", &[&lock_id])
.await;
}

listen_res?;
self.client.batch_execute(&sql).await?;

tracing::debug!(%subject, "subscription established successfully");
Ok(Box::new(PostgresSubscriber {
driver: self.clone(),
rx,
local_request_rx,
lock_id,
subject: subject.to_string(),
}))
}
Expand Down Expand Up @@ -323,48 +302,8 @@ impl PubSubDriver for PostgresDriver {
}
}

// Normal path: check for listeners via database
tracing::debug!(%subject, "checking for remote listeners via database");
// Get a connection from the pool for checking listeners
let conn = self
.pool
.get()
.await
.context("failed to get connection from pool")?;

// First check if there are any listeners for this subject
let lock_id = subject_to_lock_id(subject);

// Check if there are any shared advisory locks (listeners) for this subject
// Query pg_locks directly to avoid lock acquisition overhead
// Split the 64-bit lock_id into two 32-bit integers for pg_locks query
let classid = (lock_id >> 32) as i32;
let objid = (lock_id & 0xFFFFFFFF) as i32;

let check_sql = "
SELECT EXISTS (
SELECT 1 FROM pg_locks
WHERE locktype = 'advisory'
AND classid = $1::int
AND objid = $2::int
AND mode = 'ShareLock'
) AS has_listeners
";
let row = conn.query_one(check_sql, &[&classid, &objid]).await?;
let has_listeners: bool = row.get(0);
tracing::debug!(
%subject,
?has_listeners,
"checked for listeners in database"
);

if !has_listeners {
tracing::warn!(%subject, "no listeners found for subject");
return Err(errors::Ups::NoResponders.build().into());
}

// Drop the pool connection before creating new dedicated connections
drop(conn);
// Normal path: use database for request/response
tracing::debug!(%subject, "using database path for request");
Comment on lines +305 to +306
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical Issue: Removing Listener Check Causes Resource Waste

The removal of the listener check (lines 326-364) introduces a significant performance and resource problem. Without this check, the system will:

  1. Create unnecessary database connections
  2. Wait for the full timeout period for every request to subjects with no listeners
  3. Waste resources instead of failing fast

While consolidating error types is reasonable, the optimization logic should be preserved. Consider modifying the existing check to return RequestTimeout instead of NoResponders when no listeners are found:

if !has_listeners {
    tracing::warn!(%subject, "no listeners found for subject");
    return Err(errors::Ups::RequestTimeout.build().into());
}

This maintains the performance optimization while achieving the goal of error type consolidation.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


// Create a temporary reply subject and a dedicated listener connection
let reply_subject = format!("_INBOX.{}", uuid::Uuid::new_v4());
Expand Down Expand Up @@ -489,13 +428,12 @@ pub struct PostgresSubscriber {
driver: PostgresDriver,
rx: tokio::sync::broadcast::Receiver<(Vec<u8>, Option<String>)>,
local_request_rx: Option<tokio::sync::broadcast::Receiver<LocalRequest>>,
lock_id: i64,
subject: String,
}

#[async_trait]
impl SubscriberDriver for PostgresSubscriber {
#[tracing::instrument(skip(self), fields(subject = %self.subject, lock_id = %self.lock_id))]
#[tracing::instrument(skip(self), fields(subject = %self.subject))]
async fn next(&mut self) -> Result<NextOutput> {
tracing::debug!("waiting for message");

Expand Down Expand Up @@ -543,7 +481,7 @@ impl SubscriberDriver for PostgresSubscriber {
}))
}
std::result::Result::Err(_) => {
tracing::debug!(?self.subject, ?self.lock_id, "subscription closed");
tracing::debug!(?self.subject, "subscription closed");

Ok(NextOutput::Unsubscribed)
}
Expand Down Expand Up @@ -574,14 +512,13 @@ impl SubscriberDriver for PostgresSubscriber {

impl Drop for PostgresSubscriber {
fn drop(&mut self) {
tracing::debug!(subject = %self.subject, ?self.lock_id, "dropping postgres subscriber");
tracing::debug!(subject = %self.subject, "dropping postgres subscriber");

let lock_id = self.lock_id;
let driver = self.driver.clone();
let subject = self.subject.clone();
let has_local_rx = self.local_request_rx.is_some();

// Spawn a task to release the lock
// Spawn a task to clean up
tokio::spawn(async move {
// Clean up local subscription registration if memory optimization is enabled
if has_local_rx {
Expand Down Expand Up @@ -610,11 +547,6 @@ impl Drop for PostgresSubscriber {
}
}
}

let _ = driver
.client
.execute("SELECT pg_advisory_unlock_shared($1)", &[&lock_id])
.await;
});
}
}
Expand All @@ -624,17 +556,3 @@ fn quote_ident(subject: &str) -> String {
let escaped = subject.replace('"', "\"\"");
format!("\"{}\"", escaped)
}

/// Convert a subject name to a PostgreSQL advisory lock ID
/// Uses SHA256 hash truncated to 63 bits to avoid collisions
fn subject_to_lock_id(subject: &str) -> i64 {
let mut hasher = Sha256::new();
hasher.update(subject.as_bytes());
let hash = hasher.finalize();

// Take first 8 bytes and convert to i64, using only 63 bits to avoid sign issues
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&hash[0..8]);
let hash_u64 = u64::from_be_bytes(bytes);
(hash_u64 & 0x7FFFFFFFFFFFFFFF) as i64
}
3 changes: 0 additions & 3 deletions packages/common/universalpubsub/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ use serde::{Deserialize, Serialize};
#[derive(RivetError, Debug, Deserialize, Serialize)]
#[error("ups")]
pub enum Ups {
#[error("no_responders", "No responders.")]
NoResponders,

#[error("request_timeout", "Request timeout.")]
RequestTimeout,
}
25 changes: 0 additions & 25 deletions packages/common/universalpubsub/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ async fn test_inner(pubsub: &PubSub) {
let start = Instant::now();
test_request_timeout(&pubsub).await.unwrap();
tracing::info!(duration_ms = ?start.elapsed().as_millis(), "test_request_timeout completed");

let start = Instant::now();
test_no_responders(&pubsub).await.unwrap();
tracing::info!(duration_ms = ?start.elapsed().as_millis(), "test_no_responders completed");
}

async fn test_basic_pub_sub(pubsub: &PubSub) -> Result<()> {
Expand Down Expand Up @@ -332,24 +328,3 @@ async fn test_request_timeout(pubsub: &PubSub) -> Result<()> {

Ok(())
}

async fn test_no_responders(pubsub: &PubSub) -> Result<()> {
tracing::info!("testing no responders error");

let result = pubsub
.request("test.no_responders", b"no one listening")
.await;
assert!(
result.is_err(),
"Expected request to fail with no responders"
);

let err = result.err().unwrap();
let err = err
.downcast_ref::<RivetError>()
.expect("expected errors::Ups");
assert_eq!(err.group(), "ups");
assert_eq!(err.code(), "no_responders");

Ok(())
}
11 changes: 5 additions & 6 deletions packages/core/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::*;
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};

Check warning on line 4 in packages/core/pegboard-gateway/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/pegboard-gateway/src/lib.rs
use gas::prelude::*;
use http_body_util::{BodyExt, Full};
use hyper::{Request, Response, StatusCode, body::Incoming as BodyIncoming};
use hyper::{body::Incoming as BodyIncoming, Request, Response, StatusCode};
use hyper_tungstenite::HyperWebsocket;
use pegboard::pubsub_subjects::{
TunnelHttpResponseSubject, TunnelHttpRunnerSubject, TunnelHttpWebSocketSubject,
Expand All @@ -13,25 +13,24 @@
use rivet_guard_core::{
custom_serve::CustomServeTrait,
proxy_service::{ResponseBody, X_RIVET_ERROR},
request_context::RequestContext,

Check warning on line 16 in packages/core/pegboard-gateway/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/pegboard-gateway/src/lib.rs
};
use rivet_tunnel_protocol::{
MessageBody, StreamFinishReason, ToServerRequestFinish, ToServerRequestStart,
versioned, MessageBody, StreamFinishReason, ToServerRequestFinish, ToServerRequestStart,
ToServerWebSocketClose, ToServerWebSocketMessage, ToServerWebSocketOpen, TunnelMessage,
versioned,
};
use rivet_util::serde::HashableMap;
use std::result::Result::Ok as ResultOk;
use std::{

Check warning on line 24 in packages/core/pegboard-gateway/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/pegboard-gateway/src/lib.rs
collections::HashMap,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::{

Check warning on line 32 in packages/core/pegboard-gateway/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/pegboard-gateway/src/lib.rs
sync::{Mutex, oneshot},
sync::{oneshot, Mutex},
time::timeout,
};
use tokio_tungstenite::tungstenite::Message;
Expand Down Expand Up @@ -508,7 +507,7 @@
fn is_tunnel_closed_error(err: &anyhow::Error) -> bool {
if let Some(err) = err.chain().find_map(|x| x.downcast_ref::<RivetError>())
&& err.group() == "ups"
&& (err.code() == "no_responders" || err.code() == "request_timeout")
&& err.code() == "request_timeout"
{
true
} else {
Expand Down
10 changes: 5 additions & 5 deletions packages/core/pegboard-tunnel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use gas::prelude::*;
use http_body_util::Full;

Check warning on line 9 in packages/core/pegboard-tunnel/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/pegboard-tunnel/src/lib.rs
use hyper::body::{Bytes, Incoming as BodyIncoming};
use hyper::{Request, Response, StatusCode};
use hyper_tungstenite::tungstenite::Utf8Bytes as WsUtf8Bytes;
use hyper_tungstenite::tungstenite::protocol::frame::CloseFrame as WsCloseFrame;
use hyper_tungstenite::tungstenite::protocol::frame::coding::CloseCode as WsCloseCode;
use hyper_tungstenite::{HyperWebsocket, tungstenite::Message as WsMessage};
use hyper_tungstenite::tungstenite::protocol::frame::CloseFrame as WsCloseFrame;
use hyper_tungstenite::tungstenite::Utf8Bytes as WsUtf8Bytes;
use hyper_tungstenite::{tungstenite::Message as WsMessage, HyperWebsocket};
use pegboard::pubsub_subjects::{
TunnelHttpResponseSubject, TunnelHttpRunnerSubject, TunnelHttpWebSocketSubject,
};
use rivet_guard_core::custom_serve::CustomServeTrait;
use rivet_guard_core::proxy_service::ResponseBody;

Check warning on line 20 in packages/core/pegboard-tunnel/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/pegboard-tunnel/src/lib.rs
use rivet_guard_core::request_context::RequestContext;
use rivet_pools::Pools;
use rivet_tunnel_protocol::{MessageBody, TunnelMessage, versioned};
use rivet_tunnel_protocol::{versioned, MessageBody, TunnelMessage};
use rivet_util::Id;
use std::net::SocketAddr;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -690,7 +690,7 @@
.chain()
.find_map(|x| x.downcast_ref::<rivet_error::RivetError>())
&& err.group() == "ups"
&& (err.code() == "no_responders" || err.code() == "request_timeout")
&& err.code() == "request_timeout"
{
true
} else {
Expand Down
Loading