Skip to content

Commit 0678b46

Browse files
committed
chore(ups): remove no responders
1 parent 8b94417 commit 0678b46

File tree

10 files changed

+31
-145
lines changed

10 files changed

+31
-145
lines changed

dev-docs/PEGBOARD_TUNNEL_RETRIES.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ This document explains how retries are coordinated between Guard and Pegboard-ba
77
## HTTP
88

99
- Signal: A retryable transient tunnel failure is signaled by returning an HTTP 503 with the `X-RIVET-ERROR` header set.
10-
- Example (Pegboard Gateway): on tunnel closed (e.g., UPS `no_responders` or `request_timeout`), the gateway replies with `503` and `X-RIVET-ERROR: pegboard_gateway.tunnel_closed`.
10+
- Example (Pegboard Gateway): on tunnel closed (e.g., UPS `request_timeout`), the gateway replies with `503` and `X-RIVET-ERROR: pegboard_gateway.tunnel_closed`.
1111

1212
- Guard behavior
1313
- Guard considers a response retryable if `status == 503` and the `X-RIVET-ERROR` header is present.
@@ -37,7 +37,7 @@ This section explains how WebSocket retries are coordinated between Guard and Pe
3737
- Handler contract:
3838
- Do not await the client websocket yet.
3939
- Return the untouched `HyperWebsocket` in the error tuple so Guard still owns it: `Err((client_ws, err))`.
40-
- The outer wrapper maps tunnel-closed UPS errors (e.g., `ups.no_responders`, `ups.request_timeout`) to `WebSocketServiceUnavailable`.
40+
- The outer wrapper maps tunnel-closed UPS errors (e.g., `ups.request_timeout`) to `WebSocketServiceUnavailable`.
4141
- Guard reaction:
4242
- Treats `WebSocketServiceUnavailable` as retryable.
4343
- Re-resolves the route with ignore-cache=true, using middleware-config retry/backoff.
@@ -65,7 +65,7 @@ This section explains how WebSocket retries are coordinated between Guard and Pe
6565
- Return the socket in the error tuple: `Err((client_ws, err))`.
6666

6767
- Map tunnel-closed errors at the wrapper:
68-
- In the outer `handle_websocket` wrapper, detect tunnel-closed (e.g., `ups.no_responders`, `ups.request_timeout`) and map to `WebSocketServiceUnavailable`.
68+
- In the outer `handle_websocket` wrapper, detect tunnel-closed (e.g., `ups.request_timeout`) and map to `WebSocketServiceUnavailable`.
6969
- `handle_websocket_inner` should return raw errors; do not construct `WebSocketServiceUnavailable` inside the inner function.
7070

7171
- Use `ups.request` for all tunnel operations (open, messages, close):

out/errors/ups.no_responders.json

Lines changed: 0 additions & 5 deletions
This file was deleted.

out/errors/ups.no_respondersraCompletionMarker.json

Lines changed: 0 additions & 5 deletions
This file was deleted.

packages/common/universalpubsub/src/driver/memory/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::time::Duration;
44

55
use anyhow::*;
66
use async_trait::async_trait;
7-
use tokio::sync::{RwLock, mpsc};
7+
use tokio::sync::{mpsc, RwLock};
88
use uuid::Uuid;
99

1010
use crate::driver::{PubSubDriver, SubscriberDriver, SubscriberDriverHandle};
@@ -93,7 +93,10 @@ impl PubSubDriver for MemoryDriver {
9393
.get(&subject_with_channel)
9494
.map_or(true, |subs| subs.is_empty())
9595
{
96-
return Err(crate::errors::Ups::NoResponders.build().into());
96+
// HACK: There is no native NoResponders error, so we return
97+
// RequestTimeout. This is equivalent since the request would time out
98+
// if there are no responders.
99+
return Err(crate::errors::Ups::RequestTimeout.build().into());
97100
}
98101
}
99102

packages/common/universalpubsub/src/driver/nats/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ impl PubSubDriver for NatsDriver {
6767
std::result::Result::Ok(msg) => msg,
6868
std::result::Result::Err(err) => match err.kind() {
6969
RequestErrorKind::NoResponders => {
70-
return Err(errors::Ups::NoResponders.build().into());
70+
// HACK: There is no native NoResponders error, so we return
71+
// RequestTimeout. This is equivalent since the request would time out
72+
// if there are no responders.
73+
return Err(errors::Ups::RequestTimeout.build().into());
7174
}
7275
RequestErrorKind::TimedOut => {
7376
return Err(errors::Ups::RequestTimeout.build().into());
@@ -84,7 +87,8 @@ impl PubSubDriver for NatsDriver {
8487
std::result::Result::Ok(msg) => msg,
8588
std::result::Result::Err(err) => match err.kind() {
8689
RequestErrorKind::NoResponders => {
87-
return Err(errors::Ups::NoResponders.build().into());
90+
// HACK: See above
91+
return Err(errors::Ups::RequestTimeout.build().into());
8892
}
8993
RequestErrorKind::TimedOut => {
9094
return Err(errors::Ups::RequestTimeout.build().into());

packages/common/universalpubsub/src/driver/postgres/mod.rs

Lines changed: 7 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use deadpool_postgres::{Config, ManagerConfig, Pool, PoolConfig, RecyclingMethod
1111
use futures_util::future::poll_fn;
1212
use moka::future::Cache;
1313
use serde::{Deserialize, Serialize};
14-
use sha2::{Digest, Sha256};
1514
use tokio::sync::RwLock;
1615
use tokio_postgres::{AsyncMessage, NoTls};
1716
use tracing::Instrument;
@@ -167,10 +166,6 @@ impl PubSubDriver for PostgresDriver {
167166
None
168167
};
169168

170-
// Get the lock ID for this subject
171-
let lock_id = subject_to_lock_id(subject);
172-
tracing::debug!(%subject, ?lock_id, "calculated advisory lock id");
173-
174169
// Convert subject to base64 hash string because Postgres identifiers can only be 63 bytes
175170
let mut hasher = DefaultHasher::new();
176171
subject.hash(&mut hasher);
@@ -189,30 +184,14 @@ impl PubSubDriver for PostgresDriver {
189184
.tx
190185
.subscribe();
191186

192-
let lock_sql = format!("SELECT pg_try_advisory_lock_shared({})", lock_id);
193-
let lock_res = self.client.query_one(&lock_sql, &[]).await?;
194-
let lock_acquired = lock_res.get::<_, bool>(0);
195-
ensure!(lock_acquired, "Failed to acquire advisory lock for subject");
196-
197187
let sql = format!("LISTEN {}", quote_ident(&subject_hash));
198-
let listen_res = self.client.batch_execute(&sql).await;
199-
200-
if listen_res.is_err() {
201-
// Release lock on error
202-
let _ = self
203-
.client
204-
.execute("SELECT pg_advisory_unlock_shared($1)", &[&lock_id])
205-
.await;
206-
}
207-
208-
listen_res?;
188+
self.client.batch_execute(&sql).await?;
209189

210190
tracing::debug!(%subject, "subscription established successfully");
211191
Ok(Box::new(PostgresSubscriber {
212192
driver: self.clone(),
213193
rx,
214194
local_request_rx,
215-
lock_id,
216195
subject: subject.to_string(),
217196
}))
218197
}
@@ -323,48 +302,8 @@ impl PubSubDriver for PostgresDriver {
323302
}
324303
}
325304

326-
// Normal path: check for listeners via database
327-
tracing::debug!(%subject, "checking for remote listeners via database");
328-
// Get a connection from the pool for checking listeners
329-
let conn = self
330-
.pool
331-
.get()
332-
.await
333-
.context("failed to get connection from pool")?;
334-
335-
// First check if there are any listeners for this subject
336-
let lock_id = subject_to_lock_id(subject);
337-
338-
// Check if there are any shared advisory locks (listeners) for this subject
339-
// Query pg_locks directly to avoid lock acquisition overhead
340-
// Split the 64-bit lock_id into two 32-bit integers for pg_locks query
341-
let classid = (lock_id >> 32) as i32;
342-
let objid = (lock_id & 0xFFFFFFFF) as i32;
343-
344-
let check_sql = "
345-
SELECT EXISTS (
346-
SELECT 1 FROM pg_locks
347-
WHERE locktype = 'advisory'
348-
AND classid = $1::int
349-
AND objid = $2::int
350-
AND mode = 'ShareLock'
351-
) AS has_listeners
352-
";
353-
let row = conn.query_one(check_sql, &[&classid, &objid]).await?;
354-
let has_listeners: bool = row.get(0);
355-
tracing::debug!(
356-
%subject,
357-
?has_listeners,
358-
"checked for listeners in database"
359-
);
360-
361-
if !has_listeners {
362-
tracing::warn!(%subject, "no listeners found for subject");
363-
return Err(errors::Ups::NoResponders.build().into());
364-
}
365-
366-
// Drop the pool connection before creating new dedicated connections
367-
drop(conn);
305+
// Normal path: use database for request/response
306+
tracing::debug!(%subject, "using database path for request");
368307

369308
// Create a temporary reply subject and a dedicated listener connection
370309
let reply_subject = format!("_INBOX.{}", uuid::Uuid::new_v4());
@@ -489,13 +428,12 @@ pub struct PostgresSubscriber {
489428
driver: PostgresDriver,
490429
rx: tokio::sync::broadcast::Receiver<(Vec<u8>, Option<String>)>,
491430
local_request_rx: Option<tokio::sync::broadcast::Receiver<LocalRequest>>,
492-
lock_id: i64,
493431
subject: String,
494432
}
495433

496434
#[async_trait]
497435
impl SubscriberDriver for PostgresSubscriber {
498-
#[tracing::instrument(skip(self), fields(subject = %self.subject, lock_id = %self.lock_id))]
436+
#[tracing::instrument(skip(self), fields(subject = %self.subject))]
499437
async fn next(&mut self) -> Result<NextOutput> {
500438
tracing::debug!("waiting for message");
501439

@@ -543,7 +481,7 @@ impl SubscriberDriver for PostgresSubscriber {
543481
}))
544482
}
545483
std::result::Result::Err(_) => {
546-
tracing::debug!(?self.subject, ?self.lock_id, "subscription closed");
484+
tracing::debug!(?self.subject, "subscription closed");
547485

548486
Ok(NextOutput::Unsubscribed)
549487
}
@@ -574,14 +512,13 @@ impl SubscriberDriver for PostgresSubscriber {
574512

575513
impl Drop for PostgresSubscriber {
576514
fn drop(&mut self) {
577-
tracing::debug!(subject = %self.subject, ?self.lock_id, "dropping postgres subscriber");
515+
tracing::debug!(subject = %self.subject, "dropping postgres subscriber");
578516

579-
let lock_id = self.lock_id;
580517
let driver = self.driver.clone();
581518
let subject = self.subject.clone();
582519
let has_local_rx = self.local_request_rx.is_some();
583520

584-
// Spawn a task to release the lock
521+
// Spawn a task to clean up
585522
tokio::spawn(async move {
586523
// Clean up local subscription registration if memory optimization is enabled
587524
if has_local_rx {
@@ -610,11 +547,6 @@ impl Drop for PostgresSubscriber {
610547
}
611548
}
612549
}
613-
614-
let _ = driver
615-
.client
616-
.execute("SELECT pg_advisory_unlock_shared($1)", &[&lock_id])
617-
.await;
618550
});
619551
}
620552
}
@@ -624,17 +556,3 @@ fn quote_ident(subject: &str) -> String {
624556
let escaped = subject.replace('"', "\"\"");
625557
format!("\"{}\"", escaped)
626558
}
627-
628-
/// Convert a subject name to a PostgreSQL advisory lock ID
629-
/// Uses SHA256 hash truncated to 63 bits to avoid collisions
630-
fn subject_to_lock_id(subject: &str) -> i64 {
631-
let mut hasher = Sha256::new();
632-
hasher.update(subject.as_bytes());
633-
let hash = hasher.finalize();
634-
635-
// Take first 8 bytes and convert to i64, using only 63 bits to avoid sign issues
636-
let mut bytes = [0u8; 8];
637-
bytes.copy_from_slice(&hash[0..8]);
638-
let hash_u64 = u64::from_be_bytes(bytes);
639-
(hash_u64 & 0x7FFFFFFFFFFFFFFF) as i64
640-
}

packages/common/universalpubsub/src/errors.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ use serde::{Deserialize, Serialize};
44
#[derive(RivetError, Debug, Deserialize, Serialize)]
55
#[error("ups")]
66
pub enum Ups {
7-
#[error("no_responders", "No responders.")]
8-
NoResponders,
9-
107
#[error("request_timeout", "Request timeout.")]
118
RequestTimeout,
129
}

packages/common/universalpubsub/tests/integration.rs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,6 @@ async fn test_inner(pubsub: &PubSub) {
137137
let start = Instant::now();
138138
test_request_timeout(&pubsub).await.unwrap();
139139
tracing::info!(duration_ms = ?start.elapsed().as_millis(), "test_request_timeout completed");
140-
141-
let start = Instant::now();
142-
test_no_responders(&pubsub).await.unwrap();
143-
tracing::info!(duration_ms = ?start.elapsed().as_millis(), "test_no_responders completed");
144140
}
145141

146142
async fn test_basic_pub_sub(pubsub: &PubSub) -> Result<()> {
@@ -332,24 +328,3 @@ async fn test_request_timeout(pubsub: &PubSub) -> Result<()> {
332328

333329
Ok(())
334330
}
335-
336-
async fn test_no_responders(pubsub: &PubSub) -> Result<()> {
337-
tracing::info!("testing no responders error");
338-
339-
let result = pubsub
340-
.request("test.no_responders", b"no one listening")
341-
.await;
342-
assert!(
343-
result.is_err(),
344-
"Expected request to fail with no responders"
345-
);
346-
347-
let err = result.err().unwrap();
348-
let err = err
349-
.downcast_ref::<RivetError>()
350-
.expect("expected errors::Ups");
351-
assert_eq!(err.group(), "ups");
352-
assert_eq!(err.code(), "no_responders");
353-
354-
Ok(())
355-
}

packages/core/pegboard-gateway/src/lib.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bytes::Bytes;
44
use futures_util::{SinkExt, StreamExt};
55
use gas::prelude::*;
66
use http_body_util::{BodyExt, Full};
7-
use hyper::{Request, Response, StatusCode, body::Incoming as BodyIncoming};
7+
use hyper::{body::Incoming as BodyIncoming, Request, Response, StatusCode};
88
use hyper_tungstenite::HyperWebsocket;
99
use pegboard::pubsub_subjects::{
1010
TunnelHttpResponseSubject, TunnelHttpRunnerSubject, TunnelHttpWebSocketSubject,
@@ -16,22 +16,21 @@ use rivet_guard_core::{
1616
request_context::RequestContext,
1717
};
1818
use rivet_tunnel_protocol::{
19-
MessageBody, StreamFinishReason, ToServerRequestFinish, ToServerRequestStart,
19+
versioned, MessageBody, StreamFinishReason, ToServerRequestFinish, ToServerRequestStart,
2020
ToServerWebSocketClose, ToServerWebSocketMessage, ToServerWebSocketOpen, TunnelMessage,
21-
versioned,
2221
};
2322
use rivet_util::serde::HashableMap;
2423
use std::result::Result::Ok as ResultOk;
2524
use std::{
2625
collections::HashMap,
2726
sync::{
28-
Arc,
2927
atomic::{AtomicU64, Ordering},
28+
Arc,
3029
},
3130
time::Duration,
3231
};
3332
use tokio::{
34-
sync::{Mutex, oneshot},
33+
sync::{oneshot, Mutex},
3534
time::timeout,
3635
};
3736
use tokio_tungstenite::tungstenite::Message;
@@ -508,7 +507,7 @@ impl PegboardGateway {
508507
fn is_tunnel_closed_error(err: &anyhow::Error) -> bool {
509508
if let Some(err) = err.chain().find_map(|x| x.downcast_ref::<RivetError>())
510509
&& err.group() == "ups"
511-
&& (err.code() == "no_responders" || err.code() == "request_timeout")
510+
&& err.code() == "request_timeout"
512511
{
513512
true
514513
} else {

packages/core/pegboard-tunnel/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,18 @@ use gas::prelude::*;
99
use http_body_util::Full;
1010
use hyper::body::{Bytes, Incoming as BodyIncoming};
1111
use hyper::{Request, Response, StatusCode};
12-
use hyper_tungstenite::tungstenite::Utf8Bytes as WsUtf8Bytes;
13-
use hyper_tungstenite::tungstenite::protocol::frame::CloseFrame as WsCloseFrame;
1412
use hyper_tungstenite::tungstenite::protocol::frame::coding::CloseCode as WsCloseCode;
15-
use hyper_tungstenite::{HyperWebsocket, tungstenite::Message as WsMessage};
13+
use hyper_tungstenite::tungstenite::protocol::frame::CloseFrame as WsCloseFrame;
14+
use hyper_tungstenite::tungstenite::Utf8Bytes as WsUtf8Bytes;
15+
use hyper_tungstenite::{tungstenite::Message as WsMessage, HyperWebsocket};
1616
use pegboard::pubsub_subjects::{
1717
TunnelHttpResponseSubject, TunnelHttpRunnerSubject, TunnelHttpWebSocketSubject,
1818
};
1919
use rivet_guard_core::custom_serve::CustomServeTrait;
2020
use rivet_guard_core::proxy_service::ResponseBody;
2121
use rivet_guard_core::request_context::RequestContext;
2222
use rivet_pools::Pools;
23-
use rivet_tunnel_protocol::{MessageBody, TunnelMessage, versioned};
23+
use rivet_tunnel_protocol::{versioned, MessageBody, TunnelMessage};
2424
use rivet_util::Id;
2525
use std::net::SocketAddr;
2626
use tokio::net::TcpListener;
@@ -690,7 +690,7 @@ fn is_tunnel_closed_error(err: &anyhow::Error) -> bool {
690690
.chain()
691691
.find_map(|x| x.downcast_ref::<rivet_error::RivetError>())
692692
&& err.group() == "ups"
693-
&& (err.code() == "no_responders" || err.code() == "request_timeout")
693+
&& err.code() == "request_timeout"
694694
{
695695
true
696696
} else {

0 commit comments

Comments
 (0)