Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 4 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[workspace]
resolver = "2"
members = ["packages/common/api-builder","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-runner","packages/core/pegboard-serverless","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/internal","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"]
members = ["packages/common/api-builder","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-runner","packages/core/pegboard-serverless","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/internal","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/ups-protocol"]

[workspace.package]
version = "25.7.1"
Expand Down Expand Up @@ -401,9 +401,6 @@ path = "sdks/rust/epoxy-protocol"
[workspace.dependencies.rivet-runner-protocol]
path = "sdks/rust/runner-protocol"

[workspace.dependencies.rivet-tunnel-protocol]
path = "sdks/rust/tunnel-protocol"

[workspace.dependencies.rivet-ups-protocol]
path = "sdks/rust/ups-protocol"

Expand Down
29 changes: 1 addition & 28 deletions packages/core/guard/server/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,38 +173,11 @@ async fn find_actor(

tracing::debug!(?actor_id, ?runner_id, "actor ready");

// TODO: Remove round trip, return key from get_runner op above
// Get runner key, namespace_id, and runner_name from runner_id
let (runner_key, namespace_id, runner_name) = ctx
.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(pegboard::keys::subspace());

let runner_key_key = pegboard::keys::runner::KeyKey::new(runner_id);
let namespace_id_key = pegboard::keys::runner::NamespaceIdKey::new(runner_id);
let runner_name_key = pegboard::keys::runner::NameKey::new(runner_id);

let (runner_key, namespace_id, runner_name) = tokio::try_join!(
tx.read_opt(&runner_key_key, Serializable),
tx.read_opt(&namespace_id_key, Serializable),
tx.read_opt(&runner_name_key, Serializable),
)?;

let runner_key = runner_key.context("runner key not found")?;
let namespace_id = namespace_id.context("runner namespace_id not found")?;
let runner_name = runner_name.context("runner name not found")?;

Ok((runner_key, namespace_id, runner_name))
})
.await?;

// Return pegboard-gateway instance
let gateway = pegboard_gateway::PegboardGateway::new(
ctx.clone(),
shared_state.pegboard_gateway.clone(),
namespace_id,
runner_name,
runner_key,
runner_id,
actor_id,
);
Ok(Some(RoutingOutput::CustomServe(std::sync::Arc::new(
Expand Down
1 change: 0 additions & 1 deletion packages/core/pegboard-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pegboard.workspace = true
rand.workspace = true
rivet-error.workspace = true
rivet-guard-core.workspace = true
rivet-tunnel-protocol.workspace = true
rivet-util.workspace = true
thiserror.workspace = true
tokio-tungstenite.workspace = true
Expand Down
33 changes: 7 additions & 26 deletions packages/core/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,16 @@ const UPS_REQ_TIMEOUT: Duration = Duration::from_secs(2);
pub struct PegboardGateway {
ctx: StandaloneCtx,
shared_state: SharedState,
namespace_id: Id,
runner_name: String,
runner_key: String,
runner_id: Id,
actor_id: Id,
}

impl PegboardGateway {
pub fn new(
ctx: StandaloneCtx,
shared_state: SharedState,
namespace_id: Id,
runner_name: String,
runner_key: String,
actor_id: Id,
) -> Self {
pub fn new(ctx: StandaloneCtx, shared_state: SharedState, runner_id: Id, actor_id: Id) -> Self {
Self {
ctx,
shared_state,
namespace_id,
runner_name,
runner_key,
runner_id: Id,
actor_id,
}
}
Expand Down Expand Up @@ -151,12 +140,8 @@ impl PegboardGateway {
.to_bytes();

// Build subject to publish to
let tunnel_subject = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(
self.namespace_id,
&self.runner_name,
&self.runner_key,
)
.to_string();
let tunnel_subject =
pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(self.runner_id).to_string();

// Start listening for request responses
let (request_id, mut msg_rx) = self
Expand Down Expand Up @@ -246,12 +231,8 @@ impl PegboardGateway {
}

// Build subject to publish to
let tunnel_subject = pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(
self.namespace_id,
&self.runner_name,
&self.runner_key,
)
.to_string();
let tunnel_subject =
pegboard::pubsub_subjects::TunnelRunnerReceiverSubject::new(self.runner_id).to_string();

// Start listening for WebSocket messages
let (request_id, mut msg_rx) = self
Expand Down
2 changes: 2 additions & 0 deletions packages/core/pegboard-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ rivet-runner-protocol.workspace = true
rivet-runtime.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-tungstenite.workspace = true
tracing.workspace = true
url.workspace = true
versioned-data-util.workspace = true
universalpubsub.workspace = true

pegboard.workspace = true
pegboard-actor-kv.workspace = true
Expand Down
Loading
Loading