Skip to content

Commit c09e161

Browse files
committed
fix(pb): use runner protocol for serverless (#3020)
1 parent 2dd73e4 commit c09e161

File tree

16 files changed

+500
-582
lines changed

16 files changed

+500
-582
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/common/runtime/src/traces.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs
22

33
use console_subscriber;
4-
use opentelemetry::trace::TracerProvider as _;
4+
use opentelemetry::trace::TracerProvider;
55
use rivet_metrics::OtelProviderGuard;
66
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
77
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt};

packages/core/pegboard-gateway/src/shared_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99
};
1010
use tokio::sync::{Mutex, mpsc};
1111
use universalpubsub::{NextOutput, PubSub, PublishOpts, Subscriber};
12-
use vbare::OwnedVersionedData as _;
12+
use vbare::OwnedVersionedData;
1313

1414
const GC_INTERVAL: Duration = Duration::from_secs(60);
1515
const MESSAGE_ACK_TIMEOUT: Duration = Duration::from_secs(5);

packages/core/pegboard-runner/src/client_to_pubsub_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use rivet_guard_core::websocket_handle::WebSocketReceiver;
99
use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
1010
use std::sync::{Arc, atomic::Ordering};
1111
use universalpubsub::PublishOpts;
12-
use vbare::OwnedVersionedData as _;
12+
use vbare::OwnedVersionedData;
1313

1414
use crate::{
1515
conn::Conn,

packages/core/pegboard-runner/src/conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::{
1313
time::Duration,
1414
};
1515
use tokio::sync::Mutex;
16-
use vbare::OwnedVersionedData as _;
16+
use vbare::OwnedVersionedData;
1717

1818
use crate::{errors::WsError, utils::UrlData};
1919

packages/core/pegboard-runner/src/pubsub_to_client_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use hyper_tungstenite::tungstenite::Message as WsMessage;
55
use rivet_runner_protocol::{self as protocol, versioned};
66
use std::sync::Arc;
77
use universalpubsub::{NextOutput, Subscriber};
8-
use vbare::OwnedVersionedData as _;
8+
use vbare::OwnedVersionedData;
99

1010
use crate::{
1111
conn::{Conn, TunnelActiveRequest},

packages/core/pegboard-serverless/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ edition.workspace = true
77

88
[dependencies]
99
anyhow.workspace = true
10+
base64.workspace = true
1011
epoxy.workspace = true
1112
gas.workspace = true
1213
reqwest-eventsource.workspace = true
@@ -16,6 +17,7 @@ rivet-runner-protocol.workspace = true
1617
rivet-types.workspace = true
1718
tracing.workspace = true
1819
universaldb.workspace = true
20+
vbare.workspace = true
1921

2022
namespace.workspace = true
2123
pegboard.workspace = true

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use std::{
77
};
88

99
use anyhow::Result;
10+
use base64::Engine;
11+
use base64::engine::general_purpose::STANDARD as BASE64;
1012
use futures_util::{StreamExt, TryStreamExt};
1113
use gas::prelude::*;
1214
use pegboard::keys;
@@ -17,6 +19,7 @@ use rivet_types::namespaces::RunnerConfig;
1719
use tokio::{sync::oneshot, task::JoinHandle, time::Duration};
1820
use universaldb::options::StreamingMode;
1921
use universaldb::utils::IsolationLevel::*;
22+
use vbare::OwnedVersionedData;
2023

2124
const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token");
2225

@@ -247,7 +250,17 @@ async fn outbound_handler(
247250
tracing::debug!(%msg.data, "received outbound req message");
248251

249252
if runner_id.is_none() {
250-
runner_id = Some(Id::parse(&msg.data)?);
253+
let data = BASE64.decode(msg.data).context("invalid base64 message")?;
254+
let payload =
255+
protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(&data)
256+
.context("invalid payload")?;
257+
258+
match payload {
259+
protocol::ToServerlessServer::ToServerlessServerInit(init) => {
260+
runner_id =
261+
Some(Id::parse(&init.runner_id).context("invalid runner id")?);
262+
}
263+
}
251264
}
252265
}
253266
Err(sse::Error::StreamEnded) => {

packages/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use base64::Engine as _;
1+
use base64::Engine;
22
use base64::prelude::BASE64_STANDARD;
33
use futures_util::StreamExt;
44
use futures_util::{FutureExt, TryStreamExt};

packages/services/pegboard/src/workflows/runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use universaldb::{
77
utils::{FormalChunkedKey, IsolationLevel::*},
88
};
99
use universalpubsub::PublishOpts;
10-
use vbare::OwnedVersionedData as _;
10+
use vbare::OwnedVersionedData;
1111

1212
use crate::{keys, workflows::actor::Allocate};
1313

0 commit comments

Comments
 (0)