Skip to content

Commit 68d6944

Browse files
committed
fix(pb): fix tunnel race condition handling (#3043)
Fixes RVT-5193
1 parent 76efee0 commit 68d6944

File tree

11 files changed

+78
-22
lines changed

11 files changed

+78
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

frontend/src/app/data-providers/engine-data-provider.tsx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
type DefaultDataProvider,
1616
RECORDS_PER_PAGE,
1717
} from "./default-data-provider";
18+
import { getConfig } from "@/components/lib/config";
1819

1920
export type CreateNamespace = {
2021
displayName: string;
@@ -29,7 +30,7 @@ export type Namespace = {
2930
};
3031

3132
export function createClient(
32-
baseUrl = engineEnv().VITE_APP_API_URL,
33+
baseUrl = getConfig().apiUrl,
3334
opts: { token: (() => string) | string },
3435
) {
3536
return new RivetClient({
@@ -42,7 +43,7 @@ export function createClient(
4243
export const createGlobalContext = (opts: {
4344
engineToken: (() => string) | string;
4445
}) => {
45-
const client = createClient(engineEnv().VITE_APP_API_URL, {
46+
const client = createClient(getConfig().apiUrl, {
4647
token: opts.engineToken,
4748
});
4849
return {

packages/core/guard/core/src/proxy_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1798,7 +1798,7 @@ impl ProxyService {
17981798
.await
17991799
{
18001800
Result::Ok(()) => {
1801-
tracing::debug!("websocket closed");
1801+
tracing::debug!("websocket handler complete, closing");
18021802

18031803
// Send graceful close
18041804
ws_handle

packages/core/pegboard-gateway/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ rivet-util.workspace = true
2323
thiserror.workspace = true
2424
tokio-tungstenite.workspace = true
2525
tokio.workspace = true
26+
tracing.workspace = true
2627
universalpubsub.workspace = true
2728
vbare.workspace = true

packages/core/pegboard-gateway/src/lib.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::shared_state::{SharedState, TunnelMessageData};
2121

2222
pub mod shared_state;
2323

24-
const UPS_REQ_TIMEOUT: Duration = Duration::from_secs(2);
24+
const TUNNEL_ACK_TIMEOUT: Duration = Duration::from_secs(2);
2525
const SEC_WEBSOCKET_PROTOCOL: HeaderName = HeaderName::from_static("sec-websocket-protocol");
2626
const WS_PROTOCOL_ACTOR: &str = "rivet_actor.";
2727

@@ -33,6 +33,7 @@ pub struct PegboardGateway {
3333
}
3434

3535
impl PegboardGateway {
36+
#[tracing::instrument(skip_all, fields(?actor_id, ?runner_id))]
3637
pub fn new(ctx: StandaloneCtx, shared_state: SharedState, runner_id: Id, actor_id: Id) -> Self {
3738
Self {
3839
ctx,
@@ -45,6 +46,7 @@ impl PegboardGateway {
4546

4647
#[async_trait]
4748
impl CustomServeTrait for PegboardGateway {
49+
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id))]
4850
async fn handle_request(
4951
&self,
5052
req: Request<Full<Bytes>>,
@@ -67,6 +69,7 @@ impl CustomServeTrait for PegboardGateway {
6769
}
6870
}
6971

72+
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id))]
7073
async fn handle_websocket(
7174
&self,
7275
client_ws: WebSocketHandle,
@@ -91,6 +94,7 @@ impl CustomServeTrait for PegboardGateway {
9194
}
9295

9396
impl PegboardGateway {
97+
#[tracing::instrument(skip_all)]
9498
async fn handle_request_inner(
9599
&self,
96100
req: Request<Full<Bytes>>,
@@ -155,9 +159,16 @@ impl PegboardGateway {
155159
self.shared_state.send_message(request_id, message).await?;
156160

157161
// Wait for response
158-
tracing::debug!("starting response handler task");
162+
tracing::debug!("gateway waiting for response from tunnel");
159163
let response_start = loop {
160-
let Some(msg) = msg_rx.recv().await else {
164+
let Some(msg) = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, msg_rx.recv())
165+
.await
166+
.map_err(|_| {
167+
tracing::warn!("timed out waiting for tunnel ack");
168+
169+
RequestError::ServiceUnavailable
170+
})?
171+
else {
161172
tracing::warn!("received no message response");
162173
return Err(RequestError::ServiceUnavailable.into());
163174
};
@@ -195,6 +206,7 @@ impl PegboardGateway {
195206
Ok(response)
196207
}
197208

209+
#[tracing::instrument(skip_all)]
198210
async fn handle_websocket_inner(
199211
&self,
200212
client_ws: WebSocketHandle,
@@ -247,34 +259,42 @@ impl PegboardGateway {
247259
.send_message(request_id, open_message)
248260
.await?;
249261

262+
tracing::debug!("gateway waiting for websocket open from tunnel");
263+
250264
// Wait for WebSocket open acknowledgment
251-
let open_ack_received = loop {
252-
let Some(msg) = msg_rx.recv().await else {
253-
bail!("received no websocket open response");
265+
loop {
266+
let Some(msg) = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, msg_rx.recv())
267+
.await
268+
.map_err(|_| {
269+
tracing::warn!("timed out waiting for tunnel ack");
270+
271+
RequestError::ServiceUnavailable
272+
})?
273+
else {
274+
tracing::warn!("received no message response");
275+
return Err(RequestError::ServiceUnavailable.into());
254276
};
255277

256278
match msg {
257279
TunnelMessageData::Message(
258280
protocol::ToServerTunnelMessageKind::ToServerWebSocketOpen,
259281
) => {
260-
break true;
282+
break;
261283
}
262284
TunnelMessageData::Message(
263285
protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(close),
264286
) => {
265-
bail!("websocket closed before opening: {close:?}");
287+
tracing::warn!(?close, "websocket closed before opening");
288+
return Err(RequestError::ServiceUnavailable.into());
266289
}
267290
TunnelMessageData::Timeout => {
268-
bail!("websocket open timeout");
291+
tracing::warn!("websocket open timeout");
292+
return Err(RequestError::ServiceUnavailable.into());
269293
}
270294
_ => {
271295
tracing::warn!("received unexpected message while waiting for websocket open");
272296
}
273297
}
274-
};
275-
276-
if !open_ack_received {
277-
bail!("failed to open websocket");
278298
}
279299

280300
// Accept the WebSocket

packages/core/pegboard-runner/src/client_to_pubsub_task.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::{
1616
utils::{self},
1717
};
1818

19+
#[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))]
1920
pub async fn task(ctx: StandaloneCtx, conn: Arc<Conn>, ws_rx: WebSocketReceiver) {
2021
match task_inner(ctx, conn, ws_rx).await {
2122
Ok(_) => {}
@@ -25,6 +26,7 @@ pub async fn task(ctx: StandaloneCtx, conn: Arc<Conn>, ws_rx: WebSocketReceiver)
2526
}
2627
}
2728

29+
#[tracing::instrument(skip_all)]
2830
async fn task_inner(
2931
ctx: StandaloneCtx,
3032
conn: Arc<Conn>,
@@ -77,6 +79,7 @@ async fn task_inner(
7779
Ok(())
7880
}
7981

82+
#[tracing::instrument(skip_all)]
8083
async fn handle_message(
8184
ctx: &StandaloneCtx,
8285
conn: &Arc<Conn>,
@@ -354,6 +357,7 @@ async fn handle_message(
354357
Ok(())
355358
}
356359

360+
#[tracing::instrument(skip_all)]
357361
async fn handle_tunnel_message(
358362
ctx: &StandaloneCtx,
359363
conn: &Arc<Conn>,

packages/core/pegboard-runner/src/pubsub_to_client_task.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{
1212
utils,
1313
};
1414

15+
#[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))]
1516
pub async fn task(ctx: StandaloneCtx, conn: Arc<Conn>, sub: Subscriber) {
1617
match task_inner(ctx, conn, sub).await {
1718
Ok(_) => {}
@@ -21,6 +22,7 @@ pub async fn task(ctx: StandaloneCtx, conn: Arc<Conn>, sub: Subscriber) {
2122
}
2223
}
2324

25+
#[tracing::instrument(skip_all)]
2426
async fn task_inner(ctx: StandaloneCtx, conn: Arc<Conn>, mut sub: Subscriber) -> Result<()> {
2527
while let Result::Ok(NextOutput::Message(ups_msg)) = sub.next().await {
2628
tracing::debug!(
@@ -37,6 +39,7 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc<Conn>, mut sub: Subscriber) ->
3739
continue;
3840
}
3941
};
42+
let is_close = utils::is_to_client_close(&msg);
4043

4144
// Handle tunnel messages
4245
if let protocol::ToClient::ToClientTunnelMessage(tunnel_msg) = &mut msg {
@@ -57,11 +60,17 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc<Conn>, mut sub: Subscriber) ->
5760
tracing::error!(?e, "failed to send message to WebSocket");
5861
break;
5962
}
63+
64+
if is_close {
65+
tracing::debug!("manually closing websocket");
66+
break;
67+
}
6068
}
6169

6270
Ok(())
6371
}
6472

73+
#[tracing::instrument(skip_all)]
6574
async fn handle_tunnel_message(conn: &Arc<Conn>, msg: &mut protocol::ToClientTunnelMessage) {
6675
// Save active request
6776
//

packages/core/pegboard-runner/src/utils.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@ use hyper_tungstenite::tungstenite::Message as WsMessage;
44
use hyper_util::rt::TokioIo;
55
use rivet_error::*;
66
use rivet_runner_protocol as protocol;
7-
use tokio_tungstenite::{
8-
WebSocketStream,
9-
tungstenite::protocol::frame::{CloseFrame, coding::CloseCode},
10-
};
117

128
#[derive(Clone)]
139
pub struct UrlData {
@@ -73,3 +69,10 @@ pub fn is_to_client_tunnel_message_kind_request_close(
7369
_ => false,
7470
}
7571
}
72+
73+
pub fn is_to_client_close(kind: &protocol::ToClient) -> bool {
74+
match kind {
75+
protocol::ToClient::ToClientClose => true,
76+
_ => false,
77+
}
78+
}

sdks/typescript/runner/src/mod.ts

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

site/public/llms-full.txt

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)