Skip to content

Commit 4881e7e

Browse files
committed
chore(pegboard): consolidate to single subscriber per gateway
1 parent 3586f1c commit 4881e7e

File tree

31 files changed

+1709
-1654
lines changed

31 files changed

+1709
-1654
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/guard/server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ hyper-tungstenite.workspace = true
2020
tower.workspace = true
2121
udb-util.workspace = true
2222
universaldb.workspace = true
23+
universalpubsub.workspace = true
2324
futures.workspace = true
2425
# TODO: Make this use workspace version
2526
hyper = "1.6.0"

packages/core/guard/server/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod cache;
55
pub mod errors;
66
pub mod middleware;
77
pub mod routing;
8+
pub mod shared_state;
89
pub mod tls;
910

1011
#[tracing::instrument(skip_all)]
@@ -26,8 +27,12 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
2627
tracing::warn!("crypto provider already installed in this process");
2728
}
2829

30+
// Share shared context
31+
let shared_state = shared_state::SharedState::new(ctx.ups()?);
32+
shared_state.start().await?;
33+
2934
// Create handlers
30-
let routing_fn = routing::create_routing_function(ctx.clone());
35+
let routing_fn = routing::create_routing_function(ctx.clone(), shared_state.clone());
3136
let cache_key_fn = cache::create_cache_key_function(ctx.clone());
3237
let middleware_fn = middleware::create_middleware_function(ctx.clone());
3338
let cert_resolver = tls::create_cert_resolver(&ctx).await?;

packages/core/guard/server/src/routing/mod.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use gas::prelude::*;
55
use hyper::header::HeaderName;
66
use rivet_guard_core::RoutingFn;
77

8-
use crate::errors;
8+
use crate::{errors, shared_state::SharedState};
99

1010
mod api_peer;
1111
mod api_public;
@@ -17,13 +17,14 @@ pub(crate) const X_RIVET_TARGET: HeaderName = HeaderName::from_static("x-rivet-t
1717

1818
/// Creates the main routing function that handles all incoming requests
1919
#[tracing::instrument(skip_all)]
20-
pub fn create_routing_function(ctx: StandaloneCtx) -> RoutingFn {
20+
pub fn create_routing_function(ctx: StandaloneCtx, shared_state: SharedState) -> RoutingFn {
2121
Arc::new(
2222
move |hostname: &str,
2323
path: &str,
2424
port_type: rivet_guard_core::proxy_service::PortType,
2525
headers: &hyper::HeaderMap| {
2626
let ctx = ctx.clone();
27+
let shared_state = shared_state.clone();
2728

2829
Box::pin(
2930
async move {
@@ -41,9 +42,15 @@ pub fn create_routing_function(ctx: StandaloneCtx) -> RoutingFn {
4142
return Ok(routing_output);
4243
}
4344

44-
if let Some(routing_output) =
45-
pegboard_gateway::route_request(&ctx, target, host, path, headers)
46-
.await?
45+
if let Some(routing_output) = pegboard_gateway::route_request(
46+
&ctx,
47+
&shared_state,
48+
target,
49+
host,
50+
path,
51+
headers,
52+
)
53+
.await?
4754
{
4855
return Ok(routing_output);
4956
}

packages/core/guard/server/src/routing/pegboard_gateway.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use hyper::header::HeaderName;
66
use rivet_guard_core::proxy_service::{RouteConfig, RouteTarget, RoutingOutput, RoutingTimeout};
77
use udb_util::{SERIALIZABLE, TxnExt};
88

9-
use crate::errors;
9+
use crate::{errors, shared_state::SharedState};
1010

1111
const ACTOR_READY_TIMEOUT: Duration = Duration::from_secs(10);
1212
pub const X_RIVET_ACTOR: HeaderName = HeaderName::from_static("x-rivet-actor");
@@ -16,6 +16,7 @@ pub const X_RIVET_PORT: HeaderName = HeaderName::from_static("x-rivet-port");
1616
#[tracing::instrument(skip_all)]
1717
pub async fn route_request(
1818
ctx: &StandaloneCtx,
19+
shared_state: &SharedState,
1920
target: &str,
2021
_host: &str,
2122
path: &str,
@@ -73,7 +74,7 @@ pub async fn route_request(
7374
let port_name = port_name.to_str()?;
7475

7576
// Lookup actor
76-
find_actor(ctx, actor_id, port_name, path).await
77+
find_actor(ctx, shared_state, actor_id, port_name, path).await
7778
}
7879

7980
struct FoundActor {
@@ -86,6 +87,7 @@ struct FoundActor {
8687
#[tracing::instrument(skip_all, fields(%actor_id, %port_name, %path))]
8788
async fn find_actor(
8889
ctx: &StandaloneCtx,
90+
shared_state: &SharedState,
8991
actor_id: Id,
9092
port_name: &str,
9193
path: &str,
@@ -158,10 +160,10 @@ async fn find_actor(
158160
actor_ids: vec![actor_id],
159161
});
160162
let res = tokio::time::timeout(Duration::from_secs(5), get_runner_fut).await??;
161-
let runner_info = res.actors.into_iter().next().filter(|x| x.is_connectable);
163+
let actor = res.actors.into_iter().next().filter(|x| x.is_connectable);
162164

163-
let runner_id = if let Some(runner_info) = runner_info {
164-
runner_info.runner_id
165+
let runner_id = if let Some(actor) = actor {
166+
actor.runner_id
165167
} else {
166168
tracing::info!(?actor_id, "waiting for actor to become ready");
167169

@@ -185,11 +187,23 @@ async fn find_actor(
185187

186188
tracing::debug!(?actor_id, ?runner_id, "actor ready");
187189

190+
// Get runner key from runner_id
191+
let runner_key = ctx
192+
.udb()?
193+
.run(|tx, _mc| async move {
194+
let txs = tx.subspace(pegboard::keys::subspace());
195+
let key_key = pegboard::keys::runner::KeyKey::new(runner_id);
196+
txs.read_opt(&key_key, SERIALIZABLE).await
197+
})
198+
.await?
199+
.context("runner key not found")?;
200+
188201
// Return pegboard-gateway instance
189202
let gateway = pegboard_gateway::PegboardGateway::new(
190203
ctx.clone(),
204+
shared_state.pegboard_gateway.clone(),
191205
actor_id,
192-
runner_id,
206+
runner_key,
193207
port_name.to_string(),
194208
);
195209
Ok(Some(RoutingOutput::CustomServe(std::sync::Arc::new(

packages/core/guard/server/src/routing/pegboard_tunnel.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,10 @@ pub async fn route_request(
1212
_host: &str,
1313
_path: &str,
1414
) -> Result<Option<RoutingOutput>> {
15-
// Check target
1615
if target != "tunnel" {
1716
return Ok(None);
1817
}
1918

20-
// Create pegboard-tunnel service instance
21-
let tunnel = pegboard_tunnel::PegboardTunnelCustomServe::new(ctx.clone()).await?;
22-
19+
let tunnel = pegboard_tunnel::PegboardTunnelCustomServe::new(ctx.clone());
2320
Ok(Some(RoutingOutput::CustomServe(Arc::new(tunnel))))
2421
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use anyhow::*;
2+
use gas::prelude::*;
3+
use std::{ops::Deref, sync::Arc};
4+
use universalpubsub::PubSub;
5+
6+
#[derive(Clone)]
7+
pub struct SharedState(Arc<SharedStateInner>);
8+
9+
impl SharedState {
10+
pub fn new(pubsub: PubSub) -> SharedState {
11+
SharedState(Arc::new(SharedStateInner {
12+
pegboard_gateway: pegboard_gateway::shared_state::SharedState::new(pubsub),
13+
}))
14+
}
15+
16+
pub async fn start(&self) -> Result<()> {
17+
self.pegboard_gateway.start().await?;
18+
Ok(())
19+
}
20+
}
21+
22+
impl Deref for SharedState {
23+
type Target = SharedStateInner;
24+
25+
fn deref(&self) -> &Self::Target {
26+
&self.0
27+
}
28+
}
29+
30+
pub struct SharedStateInner {
31+
pub pegboard_gateway: pegboard_gateway::shared_state::SharedState,
32+
}

packages/core/pegboard-gateway/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ gas.workspace = true
1414
http-body-util.workspace = true
1515
hyper = "1.6"
1616
hyper-tungstenite.workspace = true
17-
pegboard = { path = "../../services/pegboard" }
17+
pegboard.workspace = true
1818
rand.workspace = true
1919
rivet-error.workspace = true
20-
rivet-guard-core = { path = "../guard/core" }
20+
rivet-guard-core.workspace = true
2121
rivet-tunnel-protocol.workspace = true
2222
rivet-util.workspace = true
2323
tokio-tungstenite.workspace = true
2424
tokio.workspace = true
2525
universalpubsub.workspace = true
26+
versioned-data-util.workspace = true
27+
thiserror.workspace = true

0 commit comments

Comments
 (0)