Skip to content

Commit b72b232

Browse files
committed
fix(pegboard): include namespace & runner name in tunnel pubsub subject
1 parent 7e2c6e3 commit b72b232

File tree

7 files changed

+111
-25
lines changed

7 files changed

+111
-25
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/guard/server/src/routing/pegboard_gateway.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -174,26 +174,38 @@ async fn find_actor(
174174
tracing::debug!(?actor_id, ?runner_id, "actor ready");
175175

176176
// TODO: Remove round trip, return key from get_runner op above
177-
// Get runner key from runner_id
178-
let runner_key = ctx
177+
// Get runner key, namespace_id, and runner_name from runner_id
178+
let (runner_key, namespace_id, runner_name) = ctx
179179
.udb()?
180180
.run(|tx| async move {
181181
let tx = tx.with_subspace(pegboard::keys::subspace());
182-
tx.read_opt(
183-
&pegboard::keys::runner::KeyKey::new(runner_id),
184-
Serializable,
185-
)
186-
.await
182+
183+
let runner_key_key = pegboard::keys::runner::KeyKey::new(runner_id);
184+
let namespace_id_key = pegboard::keys::runner::NamespaceIdKey::new(runner_id);
185+
let runner_name_key = pegboard::keys::runner::NameKey::new(runner_id);
186+
187+
let (runner_key, namespace_id, runner_name) = tokio::try_join!(
188+
tx.read_opt(&runner_key_key, Serializable),
189+
tx.read_opt(&namespace_id_key, Serializable),
190+
tx.read_opt(&runner_name_key, Serializable),
191+
)?;
192+
193+
let runner_key = runner_key.context("runner key not found")?;
194+
let namespace_id = namespace_id.context("runner namespace_id not found")?;
195+
let runner_name = runner_name.context("runner name not found")?;
196+
197+
Ok((runner_key, namespace_id, runner_name))
187198
})
188-
.await?
189-
.context("runner key not found")?;
199+
.await?;
190200

191201
// Return pegboard-gateway instance
192202
let gateway = pegboard_gateway::PegboardGateway::new(
193203
ctx.clone(),
194204
shared_state.pegboard_gateway.clone(),
195-
actor_id,
205+
namespace_id,
206+
runner_name,
196207
runner_key,
208+
actor_id,
197209
);
198210
Ok(Some(RoutingOutput::CustomServe(std::sync::Arc::new(
199211
gateway,

packages/core/pegboard-gateway/src/lib.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,28 @@ const UPS_REQ_TIMEOUT: Duration = Duration::from_secs(2);
3737
pub struct PegboardGateway {
3838
ctx: StandaloneCtx,
3939
shared_state: SharedState,
40-
actor_id: Id,
40+
namespace_id: Id,
41+
runner_name: String,
4142
runner_key: String,
43+
actor_id: Id,
4244
}
4345

4446
impl PegboardGateway {
4547
pub fn new(
4648
ctx: StandaloneCtx,
4749
shared_state: SharedState,
48-
actor_id: Id,
50+
namespace_id: Id,
51+
runner_name: String,
4952
runner_key: String,
53+
actor_id: Id,
5054
) -> Self {
5155
Self {
5256
ctx,
5357
shared_state,
54-
actor_id,
58+
namespace_id,
59+
runner_name,
5560
runner_key,
61+
actor_id,
5662
}
5763
}
5864
}
@@ -145,9 +151,12 @@ impl PegboardGateway {
145151
.to_bytes();
146152

147153
// Build subject to publish to
148-
let tunnel_subject =
149-
pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&self.runner_key)
150-
.to_string();
154+
let tunnel_subject = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(
155+
self.namespace_id,
156+
&self.runner_name,
157+
&self.runner_key,
158+
)
159+
.to_string();
151160

152161
// Start listening for request responses
153162
let (request_id, mut msg_rx) = self
@@ -237,9 +246,12 @@ impl PegboardGateway {
237246
}
238247

239248
// Build subject to publish to
240-
let tunnel_subject =
241-
pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&self.runner_key)
242-
.to_string();
249+
let tunnel_subject = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(
250+
self.namespace_id,
251+
&self.runner_name,
252+
&self.runner_key,
253+
)
254+
.to_string();
243255

244256
// Start listening for WebSocket messages
245257
let (request_id, mut msg_rx) = self

packages/core/pegboard-tunnel/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ gas.workspace = true
1414
http-body-util = "0.1"
1515
hyper = "1.6"
1616
hyper-tungstenite.workspace = true
17+
namespace = { path = "../../services/namespace" }
1718
pegboard = { path = "../../services/pegboard" }
1819
rivet-cache.workspace = true
1920
rivet-config.workspace = true

packages/core/pegboard-tunnel/src/lib.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,50 @@ impl CustomServeTrait for PegboardTunnelCustomServe {
6464
Err(e) => return Err((client_ws, e.into())),
6565
};
6666

67+
// Extract namespace name from query parameters (required) and resolve to namespace_id
68+
let namespace_name = match url
69+
.query_pairs()
70+
.find_map(|(n, v)| (n == "namespace").then_some(v))
71+
{
72+
Some(name) => name.to_string(),
73+
None => {
74+
return Err((client_ws, anyhow!("namespace query parameter is required")));
75+
}
76+
};
77+
78+
// Resolve namespace name to namespace_id
79+
let namespace = match self
80+
.ctx
81+
.op(namespace::ops::resolve_for_name_global::Input {
82+
name: namespace_name.clone(),
83+
})
84+
.await
85+
{
86+
Result::Ok(Some(ns)) => ns,
87+
Result::Ok(None) => {
88+
return Err((
89+
client_ws,
90+
anyhow!("namespace '{}' not found", namespace_name),
91+
));
92+
}
93+
Err(e) => return Err((client_ws, e)),
94+
};
95+
let namespace_id = namespace.namespace_id;
96+
97+
// Extract runner_name from query parameters (required)
98+
let runner_name = match url
99+
.query_pairs()
100+
.find_map(|(n, v)| (n == "runner_name").then_some(v))
101+
{
102+
Some(name) => name.to_string(),
103+
None => {
104+
return Err((
105+
client_ws,
106+
anyhow!("runner_name query parameter is required"),
107+
));
108+
}
109+
};
110+
67111
// Extract runner_key from query parameters (required)
68112
let runner_key = match url
69113
.query_pairs()
@@ -92,6 +136,8 @@ impl CustomServeTrait for PegboardTunnelCustomServe {
92136
};
93137

94138
tracing::info!(
139+
?namespace_id,
140+
?runner_name,
95141
?runner_key,
96142
?protocol_version,
97143
?path,
@@ -100,8 +146,12 @@ impl CustomServeTrait for PegboardTunnelCustomServe {
100146

101147
// Subscribe to pubsub topic for this runner before accepting the client websocket so
102148
// that failures can be retried by the proxy.
103-
let topic =
104-
pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&runner_key).to_string();
149+
let topic = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(
150+
namespace_id,
151+
&runner_name,
152+
&runner_key,
153+
)
154+
.to_string();
105155
tracing::info!(%topic, ?runner_key, "subscribing to runner receiver topic");
106156
let mut sub = match ups.subscribe(&topic).await {
107157
Result::Ok(s) => s,

packages/services/pegboard/src/pubsub_subjects.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
11
use gas::prelude::*;
22

33
pub struct TunnelRunnerReceiverSubject<'a> {
4+
namespace_id: Id,
5+
runner_name: &'a str,
46
runner_key: &'a str,
57
}
68

79
impl<'a> TunnelRunnerReceiverSubject<'a> {
8-
pub fn new(runner_key: &'a str) -> Self {
9-
Self { runner_key }
10+
pub fn new(namespace_id: Id, runner_name: &'a str, runner_key: &'a str) -> Self {
11+
Self {
12+
namespace_id,
13+
runner_name,
14+
runner_key,
15+
}
1016
}
1117
}
1218

1319
impl std::fmt::Display for TunnelRunnerReceiverSubject<'_> {
1420
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15-
write!(f, "pegboard.tunnel.runner_receiver.{}", self.runner_key)
21+
write!(
22+
f,
23+
"pegboard.tunnel.runner_receiver.{}.{}.{}",
24+
self.namespace_id, self.runner_name, self.runner_key
25+
)
1626
}
1727
}
1828

sdks/typescript/runner/src/mod.ts

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)