Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 22 additions & 10 deletions packages/core/guard/server/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,26 +174,38 @@ async fn find_actor(
tracing::debug!(?actor_id, ?runner_id, "actor ready");

// TODO: Remove round trip, return key from get_runner op above
// Get runner key from runner_id
let runner_key = ctx
// Get runner key, namespace_id, and runner_name from runner_id
let (runner_key, namespace_id, runner_name) = ctx
.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(pegboard::keys::subspace());
tx.read_opt(
&pegboard::keys::runner::KeyKey::new(runner_id),
Serializable,
)
.await

let runner_key_key = pegboard::keys::runner::KeyKey::new(runner_id);
let namespace_id_key = pegboard::keys::runner::NamespaceIdKey::new(runner_id);
let runner_name_key = pegboard::keys::runner::NameKey::new(runner_id);

let (runner_key, namespace_id, runner_name) = tokio::try_join!(
tx.read_opt(&runner_key_key, Serializable),
tx.read_opt(&namespace_id_key, Serializable),
tx.read_opt(&runner_name_key, Serializable),
)?;

let runner_key = runner_key.context("runner key not found")?;
let namespace_id = namespace_id.context("runner namespace_id not found")?;
let runner_name = runner_name.context("runner name not found")?;

Ok((runner_key, namespace_id, runner_name))
})
.await?
.context("runner key not found")?;
.await?;

// Return pegboard-gateway instance
let gateway = pegboard_gateway::PegboardGateway::new(
ctx.clone(),
shared_state.pegboard_gateway.clone(),
actor_id,
namespace_id,
runner_name,
runner_key,
actor_id,
);
Ok(Some(RoutingOutput::CustomServe(std::sync::Arc::new(
gateway,
Expand Down
30 changes: 21 additions & 9 deletions packages/core/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,28 @@ const UPS_REQ_TIMEOUT: Duration = Duration::from_secs(2);
pub struct PegboardGateway {
ctx: StandaloneCtx,
shared_state: SharedState,
actor_id: Id,
namespace_id: Id,
runner_name: String,
runner_key: String,
actor_id: Id,
}

impl PegboardGateway {
pub fn new(
ctx: StandaloneCtx,
shared_state: SharedState,
actor_id: Id,
namespace_id: Id,
runner_name: String,
runner_key: String,
actor_id: Id,
) -> Self {
Self {
ctx,
shared_state,
actor_id,
namespace_id,
runner_name,
runner_key,
actor_id,
}
}
}
Expand Down Expand Up @@ -145,9 +151,12 @@ impl PegboardGateway {
.to_bytes();

// Build subject to publish to
let tunnel_subject =
pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&self.runner_key)
.to_string();
let tunnel_subject = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(
self.namespace_id,
&self.runner_name,
&self.runner_key,
)
.to_string();

// Start listening for request responses
let (request_id, mut msg_rx) = self
Expand Down Expand Up @@ -237,9 +246,12 @@ impl PegboardGateway {
}

// Build subject to publish to
let tunnel_subject =
pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&self.runner_key)
.to_string();
let tunnel_subject = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(
self.namespace_id,
&self.runner_name,
&self.runner_key,
)
.to_string();

// Start listening for WebSocket messages
let (request_id, mut msg_rx) = self
Expand Down
1 change: 1 addition & 0 deletions packages/core/pegboard-tunnel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ gas.workspace = true
http-body-util = "0.1"
hyper = "1.6"
hyper-tungstenite.workspace = true
namespace = { path = "../../services/namespace" }
pegboard = { path = "../../services/pegboard" }
rivet-cache.workspace = true
rivet-config.workspace = true
Expand Down
54 changes: 52 additions & 2 deletions packages/core/pegboard-tunnel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,50 @@ impl CustomServeTrait for PegboardTunnelCustomServe {
Err(e) => return Err((client_ws, e.into())),
};

// Extract namespace name from query parameters (required) and resolve to namespace_id
let namespace_name = match url
.query_pairs()
.find_map(|(n, v)| (n == "namespace").then_some(v))
{
Some(name) => name.to_string(),
None => {
return Err((client_ws, anyhow!("namespace query parameter is required")));
}
};

// Resolve namespace name to namespace_id
let namespace = match self
.ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: namespace_name.clone(),
})
.await
{
Result::Ok(Some(ns)) => ns,
Result::Ok(None) => {
return Err((
client_ws,
anyhow!("namespace '{}' not found", namespace_name),
));
}
Err(e) => return Err((client_ws, e)),
};
let namespace_id = namespace.namespace_id;

// Extract runner_name from query parameters (required)
let runner_name = match url
.query_pairs()
.find_map(|(n, v)| (n == "runner_name").then_some(v))
{
Some(name) => name.to_string(),
None => {
return Err((
client_ws,
anyhow!("runner_name query parameter is required"),
));
}
};

// Extract runner_key from query parameters (required)
let runner_key = match url
.query_pairs()
Expand Down Expand Up @@ -92,6 +136,8 @@ impl CustomServeTrait for PegboardTunnelCustomServe {
};

tracing::info!(
?namespace_id,
?runner_name,
?runner_key,
?protocol_version,
?path,
Expand All @@ -100,8 +146,12 @@ impl CustomServeTrait for PegboardTunnelCustomServe {

// Subscribe to pubsub topic for this runner before accepting the client websocket so
// that failures can be retried by the proxy.
let topic =
pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(&runner_key).to_string();
let topic = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(
namespace_id,
&runner_name,
&runner_key,
)
.to_string();
tracing::info!(%topic, ?runner_key, "subscribing to runner receiver topic");
let mut sub = match ups.subscribe(&topic).await {
Result::Ok(s) => s,
Expand Down
16 changes: 13 additions & 3 deletions packages/services/pegboard/src/pubsub_subjects.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
use gas::prelude::*;

pub struct TunnelRunnerReceiverSubject<'a> {
namespace_id: Id,
runner_name: &'a str,
runner_key: &'a str,
}

impl<'a> TunnelRunnerReceiverSubject<'a> {
pub fn new(runner_key: &'a str) -> Self {
Self { runner_key }
pub fn new(namespace_id: Id, runner_name: &'a str, runner_key: &'a str) -> Self {
Self {
namespace_id,
runner_name,
runner_key,
}
}
}

impl std::fmt::Display for TunnelRunnerReceiverSubject<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "pegboard.tunnel.runner_receiver.{}", self.runner_key)
write!(
f,
"pegboard.tunnel.runner_receiver.{}.{}.{}",
self.namespace_id, self.runner_name, self.runner_key
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/runner/src/mod.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading