diff --git a/Cargo.lock b/Cargo.lock index 2c96ef3bab..a752becc9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3376,6 +3376,7 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "hyper-tungstenite", + "namespace", "pegboard", "rand 0.8.5", "rivet-cache", diff --git a/packages/core/guard/server/src/routing/pegboard_gateway.rs b/packages/core/guard/server/src/routing/pegboard_gateway.rs index 4a142d7526..a7b944fa5e 100644 --- a/packages/core/guard/server/src/routing/pegboard_gateway.rs +++ b/packages/core/guard/server/src/routing/pegboard_gateway.rs @@ -174,26 +174,38 @@ async fn find_actor( tracing::debug!(?actor_id, ?runner_id, "actor ready"); // TODO: Remove round trip, return key from get_runner op above - // Get runner key from runner_id - let runner_key = ctx + // Get runner key, namespace_id, and runner_name from runner_id + let (runner_key, namespace_id, runner_name) = ctx .udb()? .run(|tx| async move { let tx = tx.with_subspace(pegboard::keys::subspace()); - tx.read_opt( - &pegboard::keys::runner::KeyKey::new(runner_id), - Serializable, - ) - .await + + let runner_key_key = pegboard::keys::runner::KeyKey::new(runner_id); + let namespace_id_key = pegboard::keys::runner::NamespaceIdKey::new(runner_id); + let runner_name_key = pegboard::keys::runner::NameKey::new(runner_id); + + let (runner_key, namespace_id, runner_name) = tokio::try_join!( + tx.read_opt(&runner_key_key, Serializable), + tx.read_opt(&namespace_id_key, Serializable), + tx.read_opt(&runner_name_key, Serializable), + )?; + + let runner_key = runner_key.context("runner key not found")?; + let namespace_id = namespace_id.context("runner namespace_id not found")?; + let runner_name = runner_name.context("runner name not found")?; + + Ok((runner_key, namespace_id, runner_name)) }) - .await? - .context("runner key not found")?; + .await?; // Return pegboard-gateway instance let gateway = pegboard_gateway::PegboardGateway::new( ctx.clone(), shared_state.pegboard_gateway.clone(), - actor_id, + namespace_id, + runner_name, runner_key, + actor_id, ); Ok(Some(RoutingOutput::CustomServe(std::sync::Arc::new( gateway, diff --git a/packages/core/pegboard-gateway/src/lib.rs b/packages/core/pegboard-gateway/src/lib.rs index 8e45374927..da35fba436 100644 --- a/packages/core/pegboard-gateway/src/lib.rs +++ b/packages/core/pegboard-gateway/src/lib.rs @@ -37,22 +37,28 @@ const UPS_REQ_TIMEOUT: Duration = Duration::from_secs(2); pub struct PegboardGateway { ctx: StandaloneCtx, shared_state: SharedState, - actor_id: Id, + namespace_id: Id, + runner_name: String, runner_key: String, + actor_id: Id, } impl PegboardGateway { pub fn new( ctx: StandaloneCtx, shared_state: SharedState, - actor_id: Id, + namespace_id: Id, + runner_name: String, runner_key: String, + actor_id: Id, ) -> Self { Self { ctx, shared_state, - actor_id, + namespace_id, + runner_name, runner_key, + actor_id, } } } @@ -145,9 +151,12 @@ impl PegboardGateway { .to_bytes(); // Build subject to publish to - let tunnel_subject = - pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&self.runner_key) - .to_string(); + let tunnel_subject = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new( + self.namespace_id, + &self.runner_name, + &self.runner_key, + ) + .to_string(); // Start listening for request responses let (request_id, mut msg_rx) = self @@ -237,9 +246,12 @@ impl PegboardGateway { } // Build subject to publish to - let tunnel_subject = - pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&self.runner_key) - .to_string(); + let tunnel_subject = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new( + self.namespace_id, + &self.runner_name, + &self.runner_key, + ) + .to_string(); // Start listening for WebSocket messages let (request_id, mut msg_rx) = self diff --git a/packages/core/pegboard-tunnel/Cargo.toml b/packages/core/pegboard-tunnel/Cargo.toml index ef927c3559..22aa22e864 100644 --- a/packages/core/pegboard-tunnel/Cargo.toml +++ b/packages/core/pegboard-tunnel/Cargo.toml @@ -14,6 +14,7 @@ gas.workspace = true http-body-util = "0.1" hyper = "1.6" hyper-tungstenite.workspace = true +namespace = { path = "../../services/namespace" } pegboard = { path = "../../services/pegboard" } rivet-cache.workspace = true rivet-config.workspace = true diff --git a/packages/core/pegboard-tunnel/src/lib.rs b/packages/core/pegboard-tunnel/src/lib.rs index ea07758b75..ecc1cbb943 100644 --- a/packages/core/pegboard-tunnel/src/lib.rs +++ b/packages/core/pegboard-tunnel/src/lib.rs @@ -64,6 +64,50 @@ impl CustomServeTrait for PegboardTunnelCustomServe { Err(e) => return Err((client_ws, e.into())), }; + // Extract namespace name from query parameters (required) and resolve to namespace_id + let namespace_name = match url + .query_pairs() + .find_map(|(n, v)| (n == "namespace").then_some(v)) + { + Some(name) => name.to_string(), + None => { + return Err((client_ws, anyhow!("namespace query parameter is required"))); + } + }; + + // Resolve namespace name to namespace_id + let namespace = match self + .ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: namespace_name.clone(), + }) + .await + { + Result::Ok(Some(ns)) => ns, + Result::Ok(None) => { + return Err(( + client_ws, + anyhow!("namespace '{}' not found", namespace_name), + )); + } + Err(e) => return Err((client_ws, e)), + }; + let namespace_id = namespace.namespace_id; + + // Extract runner_name from query parameters (required) + let runner_name = match url + .query_pairs() + .find_map(|(n, v)| (n == "runner_name").then_some(v)) + { + Some(name) => name.to_string(), + None => { + return Err(( + client_ws, + anyhow!("runner_name query parameter is required"), + )); + } + }; + // Extract runner_key from query parameters (required) let runner_key = match url .query_pairs() @@ -92,6 +136,8 @@ impl CustomServeTrait for PegboardTunnelCustomServe { }; tracing::info!( + ?namespace_id, + ?runner_name, ?runner_key, ?protocol_version, ?path, @@ -100,8 +146,12 @@ impl CustomServeTrait for PegboardTunnelCustomServe { // Subscribe to pubsub topic for this runner before accepting the client websocket so // that failures can be retried by the proxy. - let topic = - pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&runner_key).to_string(); + let topic = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new( + namespace_id, + &runner_name, + &runner_key, + ) + .to_string(); tracing::info!(%topic, ?runner_key, "subscribing to runner receiver topic"); let mut sub = match ups.subscribe(&topic).await { Result::Ok(s) => s, diff --git a/packages/services/pegboard/src/pubsub_subjects.rs b/packages/services/pegboard/src/pubsub_subjects.rs index d51f2c47c9..9eab33a888 100644 --- a/packages/services/pegboard/src/pubsub_subjects.rs +++ b/packages/services/pegboard/src/pubsub_subjects.rs @@ -1,18 +1,28 @@ use gas::prelude::*; pub struct TunnelRunnerReceiverSubject<'a> { + namespace_id: Id, + runner_name: &'a str, runner_key: &'a str, } impl<'a> TunnelRunnerReceiverSubject<'a> { - pub fn new(runner_key: &'a str) -> Self { - Self { runner_key } + pub fn new(namespace_id: Id, runner_name: &'a str, runner_key: &'a str) -> Self { + Self { + namespace_id, + runner_name, + runner_key, + } } } impl std::fmt::Display for TunnelRunnerReceiverSubject<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "pegboard.tunnel.runner_receiver.{}", self.runner_key) + write!( + f, + "pegboard.tunnel.runner_receiver.{}.{}.{}", + self.namespace_id, self.runner_name, self.runner_key + ) } } diff --git a/sdks/typescript/runner/src/mod.ts b/sdks/typescript/runner/src/mod.ts index 20cc583c41..0da1f833dd 100644 --- a/sdks/typescript/runner/src/mod.ts +++ b/sdks/typescript/runner/src/mod.ts @@ -405,7 +405,7 @@ export class Runner { const wsEndpoint = endpoint .replace("http://", "ws://") .replace("https://", "wss://"); - return `${wsEndpoint}?protocol_version=1&namespace=${encodeURIComponent(this.#config.namespace)}&runner_key=${this.#config.runnerKey}`; + return `${wsEndpoint}?protocol_version=1&namespace=${encodeURIComponent(this.#config.namespace)}&runner_name=${encodeURIComponent(this.#config.runnerName)}&runner_key=${encodeURIComponent(this.#config.runnerKey)}`; } async #openTunnelAndWait(): Promise {