diff --git a/Cargo.lock b/Cargo.lock index a752becc9c..8530a66e52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -449,7 +449,7 @@ dependencies = [ [[package]] name = "bare_gen" -version = "25.6.1" +version = "25.7.1" dependencies = [ "heck 0.5.0", "pest", @@ -755,7 +755,7 @@ dependencies = [ [[package]] name = "clickhouse-inserter" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-channel", @@ -779,7 +779,7 @@ dependencies = [ [[package]] name = "clickhouse-user-query" -version = "25.6.1" +version = "25.7.1" dependencies = [ "clickhouse", "serde", @@ -1382,7 +1382,7 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" [[package]] name = "epoxy" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -1421,7 +1421,7 @@ dependencies = [ [[package]] name = "epoxy-protocol" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "bare_gen", @@ -1683,7 +1683,7 @@ dependencies = [ [[package]] name = "gasoline" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-stream", @@ -1730,7 +1730,7 @@ dependencies = [ [[package]] name = "gasoline-macros" -version = "25.6.1" +version = "25.7.1" dependencies = [ "proc-macro2", "quote", @@ -2378,7 +2378,7 @@ dependencies = [ [[package]] name = "internal" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "gasoline", @@ -2776,7 +2776,7 @@ dependencies = [ [[package]] name = "namespace" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "gasoline", @@ -3256,7 +3256,7 @@ checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" [[package]] name = "pegboard" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "epoxy", @@ -3283,7 +3283,7 @@ dependencies = [ [[package]] name = "pegboard-actor-kv" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "futures-util", @@ -3301,7 +3301,7 @@ dependencies = [ [[package]] name = "pegboard-gateway" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-trait", @@ -3326,16 +3326,24 @@ dependencies = [ [[package]] name = "pegboard-runner-ws" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", + "async-trait", + "bytes", + "futures-util", "gasoline", + "http-body 1.0.1", + "http-body-util", "hyper 1.6.0", + "hyper-tungstenite", + "hyper-util", "namespace", "pegboard", "pegboard-actor-kv", "rivet-config", "rivet-error", + "rivet-guard-core", "rivet-metrics", "rivet-runner-protocol", "rivet-runtime", @@ -3349,7 +3357,7 @@ dependencies = [ [[package]] name = "pegboard-serverless" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "epoxy", @@ -3366,7 +3374,7 @@ dependencies = [ [[package]] name = "pegboard-tunnel" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-trait", @@ -4078,7 +4086,7 @@ dependencies = [ [[package]] name = "rivet-api-builder" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4121,7 +4129,7 @@ dependencies = [ [[package]] name = "rivet-api-peer" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4149,7 +4157,7 @@ dependencies = [ [[package]] name = "rivet-api-public" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4179,7 +4187,7 @@ dependencies = [ [[package]] name = "rivet-api-types" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "gasoline", @@ -4194,7 +4202,7 @@ dependencies = [ [[package]] name = "rivet-api-util" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4214,7 +4222,7 @@ dependencies = [ [[package]] name = "rivet-bootstrap" -version = "25.6.1" +version = "25.7.1" dependencies = [ "epoxy", "gasoline", @@ -4230,7 +4238,7 @@ dependencies = [ [[package]] name = "rivet-cache" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "futures-util", @@ -4253,14 +4261,14 @@ dependencies = [ [[package]] name = "rivet-cache-result" -version = "25.6.1" +version = "25.7.1" dependencies = [ "rivet-util", ] [[package]] name = "rivet-config" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "config", @@ -4278,7 +4286,7 @@ dependencies = [ [[package]] name = "rivet-data" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "bare_gen", @@ -4296,7 +4304,7 @@ dependencies = [ [[package]] name = "rivet-dump-openapi" -version = "25.6.1" +version = "25.7.1" dependencies = [ "rivet-api-public", "serde_json", @@ -4305,7 +4313,7 @@ dependencies = [ [[package]] name = "rivet-engine" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4363,7 +4371,7 @@ dependencies = [ [[package]] name = "rivet-env" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "lazy_static", @@ -4373,7 +4381,7 @@ dependencies = [ [[package]] name = "rivet-error" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "indoc", @@ -4385,7 +4393,7 @@ dependencies = [ [[package]] name = "rivet-error-macros" -version = "25.6.1" +version = "25.7.1" dependencies = [ "indoc", "proc-macro2", @@ -4396,7 +4404,7 @@ dependencies = [ [[package]] name = "rivet-guard" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4411,6 +4419,7 @@ dependencies = [ "once_cell", "pegboard", "pegboard-gateway", + "pegboard-runner-ws", "pegboard-tunnel", "regex", "rivet-api-public", @@ -4438,7 +4447,7 @@ dependencies = [ [[package]] name = "rivet-guard-core" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-trait", @@ -4482,7 +4491,7 @@ dependencies = [ [[package]] name = "rivet-logs" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "chrono", @@ -4496,7 +4505,7 @@ dependencies = [ [[package]] name = "rivet-metrics" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "console-subscriber", @@ -4514,7 +4523,7 @@ dependencies = [ [[package]] name = "rivet-pools" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-nats", @@ -4547,7 +4556,7 @@ dependencies = [ [[package]] name = "rivet-runner-protocol" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "bare_gen", @@ -4566,7 +4575,7 @@ dependencies = [ [[package]] name = "rivet-runtime" -version = "25.6.1" +version = "25.7.1" dependencies = [ "console-subscriber", "lazy_static", @@ -4592,7 +4601,7 @@ dependencies = [ [[package]] name = "rivet-service-manager" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "chrono", @@ -4607,7 +4616,7 @@ dependencies = [ [[package]] name = "rivet-telemetry" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "rivet-config", @@ -4631,7 +4640,7 @@ dependencies = [ [[package]] name = "rivet-test-deps" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "futures-util", @@ -4649,7 +4658,7 @@ dependencies = [ [[package]] name = "rivet-test-deps-docker" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "portpicker", @@ -4662,7 +4671,7 @@ dependencies = [ [[package]] name = "rivet-tunnel-protocol" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "bare_gen", @@ -4679,7 +4688,7 @@ dependencies = [ [[package]] name = "rivet-types" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "gasoline", @@ -4695,7 +4704,7 @@ dependencies = [ [[package]] name = "rivet-ups-protocol" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "bare_gen", @@ -4712,7 +4721,7 @@ dependencies = [ [[package]] name = "rivet-util" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-trait", @@ -4741,7 +4750,7 @@ dependencies = [ [[package]] name = "rivet-util-id" -version = "25.6.1" +version = "25.7.1" dependencies = [ "serde", "thiserror 1.0.69", @@ -4752,7 +4761,7 @@ dependencies = [ [[package]] name = "rivet-workflow-worker" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "epoxy", @@ -6369,7 +6378,7 @@ checksum = "4a1a07cc7db3810833284e8d372ccdc6da29741639ecc70c9ec107df0fa6154c" [[package]] name = "universaldb" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-trait", @@ -6396,7 +6405,7 @@ dependencies = [ [[package]] name = "universalpubsub" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "async-nats", @@ -6560,7 +6569,7 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "versioned-data-util" -version = "25.6.1" +version = "25.7.1" dependencies = [ "anyhow", "serde", diff --git a/out/openapi.json b/out/openapi.json index 536842850a..f22a1062bf 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -11,7 +11,7 @@ "name": "Apache-2.0", "identifier": "Apache-2.0" }, - "version": "25.6.1" + "version": "25.7.1" }, "paths": { "/actors": { diff --git a/packages/common/config/src/config/mod.rs b/packages/common/config/src/config/mod.rs index a04dcecb97..9e2f82ba93 100644 --- a/packages/common/config/src/config/mod.rs +++ b/packages/common/config/src/config/mod.rs @@ -10,7 +10,6 @@ pub mod clickhouse; pub mod db; pub mod guard; pub mod logs; -pub mod pegboard; pub mod pegboard_gateway; pub mod pegboard_tunnel; pub mod pubsub; @@ -25,7 +24,6 @@ pub use clickhouse::*; pub use db::Database; pub use guard::*; pub use logs::*; -pub use pegboard::*; pub use pegboard_gateway::*; pub use pegboard_tunnel::*; pub use pubsub::PubSub; @@ -73,9 +71,6 @@ pub struct Root { #[serde(default)] pub api_peer: Option, - #[serde(default)] - pub pegboard: Option, - #[serde(default)] pub pegboard_gateway: Option, @@ -113,7 +108,6 @@ impl Default for Root { guard: None, api_public: None, api_peer: None, - pegboard: None, pegboard_gateway: None, pegboard_tunnel: None, logs: None, @@ -144,11 +138,6 @@ impl Root { self.api_peer.as_ref().unwrap_or(&DEFAULT) } - pub fn pegboard(&self) -> &Pegboard { - static DEFAULT: LazyLock = LazyLock::new(Pegboard::default); - self.pegboard.as_ref().unwrap_or(&DEFAULT) - } - pub fn pegboard_gateway(&self) -> &PegboardGateway { static DEFAULT: LazyLock = LazyLock::new(PegboardGateway::default); self.pegboard_gateway.as_ref().unwrap_or(&DEFAULT) diff --git a/packages/common/config/src/config/pegboard.rs b/packages/common/config/src/config/pegboard.rs deleted file mode 100644 index bf18b3cda4..0000000000 --- a/packages/common/config/src/config/pegboard.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::net::IpAddr; - -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -/// The service that manages runner ws connections. -#[derive(Debug, Serialize, Deserialize, Clone, Default, JsonSchema)] -#[serde(deny_unknown_fields)] -pub struct Pegboard { - /// The host on which the ws service listens. - pub host: Option, - /// The host on which the ws service is accessible to Guard. - pub lan_host: Option, - /// The port on which the ws service listens. - pub port: Option, -} - -impl Pegboard { - pub fn lan_host(&self) -> &str { - self.lan_host - .as_deref() - .unwrap_or(crate::defaults::hosts::PEGBOARD_RUNNER_LAN) - } - - pub fn host(&self) -> IpAddr { - self.host - .unwrap_or(crate::defaults::hosts::PEGBOARD_RUNNER_WS) - } - - pub fn port(&self) -> u16 { - self.port - .unwrap_or(crate::defaults::ports::PEGBOARD_RUNNER_WS) - } -} diff --git a/packages/common/test-deps/src/datacenter.rs b/packages/common/test-deps/src/datacenter.rs index aa1596453c..861a385f93 100644 --- a/packages/common/test-deps/src/datacenter.rs +++ b/packages/common/test-deps/src/datacenter.rs @@ -60,13 +60,10 @@ pub async fn setup_single_datacenter( dc = dc.datacenter_label, "containers started, waiting for services to be ready" ); - // Pick ports for other services - let pegboard_port = portpicker::pick_unused_port().context("pegboard_port")?; tracing::info!( dc = dc.datacenter_label, api_peer_port, - pegboard_port, guard_port, "using ports for test services" ); @@ -80,10 +77,6 @@ pub async fn setup_single_datacenter( port: Some(api_peer_port), ..Default::default() }); - root.pegboard = Some(rivet_config::config::Pegboard { - port: Some(pegboard_port), - ..Default::default() - }); root.topology = Some(rivet_config::config::topology::Topology { datacenter_label: dc.datacenter_label, @@ -111,7 +104,6 @@ pub async fn setup_single_datacenter( container_names, api_peer_port, guard_port, - pegboard_port, stop_docker_containers_on_drop: true, }) } diff --git a/packages/common/test-deps/src/lib.rs b/packages/common/test-deps/src/lib.rs index adb671549b..1dc9d69efe 100644 --- a/packages/common/test-deps/src/lib.rs +++ b/packages/common/test-deps/src/lib.rs @@ -13,7 +13,6 @@ pub struct TestDeps { pub config: rivet_config::Config, container_names: Vec, api_peer_port: u16, - pegboard_port: u16, guard_port: u16, stop_docker_containers_on_drop: bool, } @@ -87,10 +86,6 @@ impl TestDeps { self.api_peer_port } - pub fn pegboard_port(&self) -> u16 { - self.pegboard_port - } - pub fn guard_port(&self) -> u16 { self.guard_port } diff --git a/packages/core/guard/server/Cargo.toml b/packages/core/guard/server/Cargo.toml index 730de20dc5..3a8b367052 100644 --- a/packages/core/guard/server/Cargo.toml +++ b/packages/core/guard/server/Cargo.toml @@ -26,6 +26,7 @@ once_cell.workspace = true pegboard-gateway.workspace = true pegboard-tunnel.workspace = true pegboard.workspace = true +pegboard-runner-ws.workspace = true regex.workspace = true rivet-api-public.workspace = true rivet-cache.workspace = true diff --git a/packages/core/guard/server/src/routing/runner_ws.rs b/packages/core/guard/server/src/routing/runner_ws.rs index 2460aee012..1a35ccaa08 100644 --- a/packages/core/guard/server/src/routing/runner_ws.rs +++ b/packages/core/guard/server/src/routing/runner_ws.rs @@ -1,6 +1,7 @@ use anyhow::*; use gas::prelude::*; use rivet_guard_core::proxy_service::{RouteConfig, RouteTarget, RoutingOutput, RoutingTimeout}; +use std::sync::Arc; /// Route requests to the API service #[tracing::instrument(skip_all)] @@ -10,22 +11,10 @@ pub async fn route_request( _host: &str, path: &str, ) -> Result> { - // Check target if target != "runner-ws" { return Ok(None); } - let targets = vec![RouteTarget { - actor_id: None, - host: ctx.config().pegboard().lan_host().to_string(), - port: ctx.config().pegboard().port(), - path: path.to_owned(), - }]; - - return Ok(Some(RoutingOutput::Route(RouteConfig { - targets, - timeout: RoutingTimeout { - routing_timeout: 10, // 10 seconds for API routing timeout - }, - }))); + let tunnel = pegboard_runner_ws::PegboardRunnerWsCustomServe::new(ctx.clone()); + Ok(Some(RoutingOutput::CustomServe(Arc::new(tunnel)))) } diff --git a/packages/core/pegboard-runner-ws/Cargo.toml b/packages/core/pegboard-runner-ws/Cargo.toml index 029190ba70..50ada2b4b9 100644 --- a/packages/core/pegboard-runner-ws/Cargo.toml +++ b/packages/core/pegboard-runner-ws/Cargo.toml @@ -7,11 +7,19 @@ edition.workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true +bytes.workspace = true +futures-util.workspace = true gas.workspace = true +http-body.workspace = true +http-body-util.workspace = true # Idk how to get this working with the workspace version hyper = "1.6" +hyper-tungstenite.workspace = true +hyper-util = "0.1" rivet-config.workspace = true rivet-error.workspace = true +rivet-guard-core.workspace = true rivet-metrics.workspace = true rivet-runner-protocol.workspace = true rivet-runtime.workspace = true diff --git a/packages/core/pegboard-runner-ws/src/lib.rs b/packages/core/pegboard-runner-ws/src/lib.rs index 88eb5bbc78..af6bd0f25b 100644 --- a/packages/core/pegboard-runner-ws/src/lib.rs +++ b/packages/core/pegboard-runner-ws/src/lib.rs @@ -1,35 +1,38 @@ -use std::{ - collections::HashMap, - net::SocketAddr, - sync::{ - Arc, - atomic::{AtomicU32, Ordering}, - }, - time::Duration, -}; - +use async_trait::async_trait; +use bytes::Bytes; use futures_util::{ SinkExt, StreamExt, stream::{SplitSink, SplitStream}, }; use gas::prelude::Id; use gas::prelude::*; +use http_body_util::Full; +use hyper::upgrade::Upgraded; +use hyper::{Response, StatusCode}; +use hyper_tungstenite::{HyperWebsocket, tungstenite::Message}; +use hyper_util::rt::TokioIo; use pegboard::ops::runner::update_alloc_idx::{Action, RunnerEligibility}; use pegboard_actor_kv as kv; use rivet_error::*; +use rivet_guard_core::{ + custom_serve::CustomServeTrait, proxy_service::ResponseBody, request_context::RequestContext, +}; use rivet_runner_protocol::*; use serde_json::json; -use tokio::{ - net::{TcpListener, TcpStream}, - sync::{Mutex, RwLock}, +use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicU32, Ordering}, + }, + time::Duration, }; +use tokio::sync::{Mutex, RwLock}; use tokio_tungstenite::{ WebSocketStream, - tungstenite::protocol::{ - Message, - frame::{CloseFrame, coding::CloseCode}, - }, + tungstenite::protocol::frame::{CloseFrame, coding::CloseCode}, }; +type HyperWebSocketStream = WebSocketStream>; use versioned_data_util::OwnedVersionedData; const UPDATE_PING_INTERVAL: Duration = Duration::from_secs(3); @@ -73,200 +76,193 @@ enum WsError { struct Connection { workflow_id: Id, protocol_version: u16, - tx: Mutex, Message>>, + tx: Arc< + Mutex< + Box< + dyn futures_util::Sink + + Send + + Unpin, + >, + >, + >, last_rtt: AtomicU32, } type Connections = HashMap>; -#[tracing::instrument(skip_all)] -pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { - let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; - let ctx = StandaloneCtx::new( - db::DatabaseKv::from_pools(pools.clone()).await?, - config.clone(), - pools, - cache, - "pegboard-runner-ws", - Id::new_v1(config.dc_label()), - Id::new_v1(config.dc_label()), - )?; - - let conns: Arc> = Arc::new(RwLock::new(HashMap::new())); - - let host = ctx.config().pegboard().host(); - let port = ctx.config().pegboard().port(); - let addr = SocketAddr::from((host, port)); - - let listener = TcpListener::bind(addr).await?; - tracing::info!(?host, ?port, "runner ws server listening"); - - // None of these should ever exit - // - // If these do exit, then the `handle_connection` task will run indefinitely and never - // send/receive anything to runners. Runner workflows will then expire because of their ping, - // their workflow will complete, and runners will be unusable unless they reconnect. - tokio::join!( - socket_thread(&ctx, conns.clone(), listener), - msg_thread(&ctx, conns.clone()), - update_ping_thread(&ctx, conns.clone()), - ); - - Ok(()) +pub struct PegboardRunnerWsCustomServe { + ctx: StandaloneCtx, + conns: Arc>, } -#[tracing::instrument(skip_all)] -async fn socket_thread( - ctx: &StandaloneCtx, - conns: Arc>, - listener: TcpListener, -) { - loop { - match listener.accept().await { - Ok((stream, addr)) => handle_connection(ctx, conns.clone(), stream, addr).await, - Err(err) => tracing::error!(?err, "failed to connect websocket"), - } +impl PegboardRunnerWsCustomServe { + pub fn new(ctx: StandaloneCtx) -> Self { + let conns = Arc::new(RwLock::new(HashMap::new())); + let service = Self { + ctx: ctx.clone(), + conns: conns.clone(), + }; + + // Start background threads + let msg_ctx = ctx.clone(); + let msg_conns = conns.clone(); + tokio::spawn(async move { + msg_thread(&msg_ctx, msg_conns).await; + }); + + let ping_ctx = ctx.clone(); + let ping_conns = conns.clone(); + tokio::spawn(async move { + update_ping_thread(&ping_ctx, ping_conns).await; + }); + + service } } -#[tracing::instrument(skip_all)] -async fn handle_connection( - ctx: &StandaloneCtx, - conns: Arc>, - raw_stream: TcpStream, - addr: SocketAddr, -) { - tracing::debug!(?addr, "new connection"); +#[async_trait] +impl CustomServeTrait for PegboardRunnerWsCustomServe { + async fn handle_request( + &self, + _req: hyper::Request>, + _request_context: &mut RequestContext, + ) -> Result> { + // Pegboard runner ws doesn't handle regular HTTP requests + // Return a simple status response + let response = Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "text/plain") + .body(ResponseBody::Full(Full::new(Bytes::from( + "pegboard-runner-ws WebSocket endpoint", + ))))?; + + Ok(response) + } - let ctx = ctx.clone(); + async fn handle_websocket( + &self, + client_ws: HyperWebsocket, + _headers: &hyper::HeaderMap, + path: &str, + _request_context: &mut RequestContext, + ) -> std::result::Result<(), (HyperWebsocket, anyhow::Error)> { + // Parse URL to extract parameters + let url = match url::Url::parse(&format!("ws://placeholder{path}")) { + Result::Ok(u) => u, + Result::Err(e) => return Err((client_ws, e.into())), + }; - tokio::spawn(async move { - let (ws_stream, uri) = match setup_stream(raw_stream, addr).await { - Ok(x) => x, - Err(err) => { - tracing::warn!(?addr, ?err, "setup stream failed"); - return; + let url_data = match parse_url_from_url(url) { + Result::Ok(x) => x, + Result::Err(err) => { + tracing::warn!(?err, "could not parse runner connection url"); + return Err((client_ws, err)); } }; - let (mut tx, mut rx) = ws_stream.split(); - - let url_data = match parse_url(addr, uri) { - Ok(x) => x, - Err(err) => { - tracing::warn!(?addr, ?err, "could not parse runner connection url"); - - let close_frame = err_to_close_frame(WsError::InvalidUrl(err.to_string()).build()); - - if let Err(err) = tx.send(Message::Close(Some(close_frame))).await { - tracing::error!(?addr, ?err, "failed closing socket"); - } - return; + // Accept WS + let ws_stream = match client_ws.await { + Result::Ok(ws) => ws, + Err(e) => { + // Handshake already in progress; cannot retry safely here + tracing::error!(error=?e, "client websocket await failed"); + return std::result::Result::<(), (HyperWebsocket, anyhow::Error)>::Ok(()); } }; - let mut tx = Some(tx); + self.handle_connection(ws_stream, url_data).await; - let (runner_id, conn) = match build_connection(&ctx, &mut tx, &mut rx, url_data).await { - Ok(res) => res, - Err(err) => { - tracing::warn!(?addr, ?err, "failed to build connection"); + std::result::Result::<(), (HyperWebsocket, anyhow::Error)>::Ok(()) + } +} - if let Some(mut tx) = tx { - let close_frame = err_to_close_frame(err); +impl PegboardRunnerWsCustomServe { + #[tracing::instrument(skip_all)] + async fn handle_connection(&self, ws_stream: HyperWebSocketStream, url_data: UrlData) { + let ctx = self.ctx.clone(); + let conns = self.conns.clone(); - if let Err(err) = tx.send(Message::Close(Some(close_frame))).await { - tracing::error!(?addr, ?err, "failed closing socket"); - } - } + tokio::spawn(async move { + let (tx, mut rx) = ws_stream.split(); + let mut tx = Some(tx); - return; - } - }; + let (runner_id, conn) = match build_connection(&ctx, &mut tx, &mut rx, url_data).await { + Ok(res) => res, + Err(err) => { + tracing::warn!(?err, "failed to build connection"); - // Store connection - { - let mut conns = conns.write().await; - if let Some(old_conn) = conns.insert(runner_id, conn.clone()) { - tracing::warn!( - ?runner_id, - "runner already connected, closing old connection" - ); + if let Some(mut tx) = tx { + let close_frame = err_to_close_frame(err); - let close_frame = err_to_close_frame(WsError::NewRunnerConnected.build()); - let mut tx = old_conn.tx.lock().await; + if let Err(err) = tx.send(Message::Close(Some(close_frame))).await { + tracing::error!(?err, "failed closing socket"); + } + } - if let Err(err) = tx.send(Message::Close(Some(close_frame))).await { - tracing::error!(?runner_id, ?err, "failed closing old connection"); + return; } - } - } + }; - let err = if let Err(err) = handle_messages(&ctx, &mut rx, runner_id, &conn).await { - tracing::warn!(?runner_id, ?err, "failed processing runner messages"); + // Store connection + { + let mut conns = conns.write().await; + if let Some(old_conn) = conns.insert(runner_id, conn.clone()) { + tracing::warn!( + ?runner_id, + "runner already connected, closing old connection" + ); - err - } else { - tracing::info!(?runner_id, "runner connection closed"); + let close_frame = err_to_close_frame(WsError::NewRunnerConnected.build()); + let mut tx = old_conn.tx.lock().await; - WsError::ConnectionClosed.build() - }; + if let Err(err) = tx.send(Message::Close(Some(close_frame))).await { + tracing::error!(?runner_id, ?err, "failed closing old connection"); + } + } + } - // Clean up - { - conns.write().await.remove(&runner_id); - } + let err = if let Err(err) = handle_messages(&ctx, &mut rx, runner_id, &conn).await { + tracing::warn!(?runner_id, ?err, "failed processing runner messages"); - // Make runner immediately ineligible when it disconnects - if let Err(err) = ctx - .op(pegboard::ops::runner::update_alloc_idx::Input { - runners: vec![pegboard::ops::runner::update_alloc_idx::Runner { - runner_id, - action: Action::ClearIdx, - }], - }) - .await - { - tracing::error!(?runner_id, ?err, "failed evicting runner from alloc idx"); - } + err + } else { + tracing::info!(?runner_id, "runner connection closed"); - let close_frame = err_to_close_frame(err); - let mut tx = conn.tx.lock().await; - if let Err(err) = tx.send(Message::Close(Some(close_frame))).await { - tracing::error!(?runner_id, ?err, "failed closing socket"); - } - }); -} + WsError::ConnectionClosed.build() + }; -#[tracing::instrument(skip_all)] -async fn setup_stream( - raw_stream: TcpStream, - addr: SocketAddr, -) -> Result<(WebSocketStream, hyper::Uri)> { - let mut uri = None; - let ws_stream = tokio_tungstenite::accept_hdr_async( - raw_stream, - |req: &tokio_tungstenite::tungstenite::handshake::server::Request, res| { - // Bootleg way of reading the uri - uri = Some(req.uri().clone()); - - tracing::debug!(?addr, ?uri, "handshake"); - - Ok(res) - }, - ) - .await?; + // Clean up + { + conns.write().await.remove(&runner_id); + } - let uri = uri.context("socket has no associated request")?; + // Make runner immediately ineligible when it disconnects + if let Err(err) = ctx + .op(pegboard::ops::runner::update_alloc_idx::Input { + runners: vec![pegboard::ops::runner::update_alloc_idx::Runner { + runner_id, + action: Action::ClearIdx, + }], + }) + .await + { + tracing::error!(?runner_id, ?err, "failed evicting runner from alloc idx"); + } - Ok((ws_stream, uri)) + let close_frame = err_to_close_frame(err); + let mut tx = conn.tx.lock().await; + if let Err(err) = tx.send(Message::Close(Some(close_frame))).await { + tracing::error!(?runner_id, ?err, "failed closing socket"); + } + }); + } } #[tracing::instrument(skip_all)] async fn build_connection( ctx: &StandaloneCtx, - tx: &mut Option, Message>>, - rx: &mut SplitStream>, + tx: &mut Option>, + rx: &mut futures_util::stream::SplitStream, UrlData { protocol_version, namespace, @@ -387,7 +383,12 @@ async fn build_connection( Arc::new(Connection { workflow_id, protocol_version, - tx: Mutex::new(tx), + tx: Arc::new(Mutex::new(Box::new(tx) + as Box< + dyn futures_util::Sink + + Send + + Unpin, + >)), last_rtt: AtomicU32::new(0), }), )) @@ -395,7 +396,7 @@ async fn build_connection( async fn handle_messages( ctx: &StandaloneCtx, - rx: &mut SplitStream>, + rx: &mut futures_util::stream::SplitStream, runner_id: Id, conn: &Connection, ) -> Result<()> { @@ -797,9 +798,7 @@ struct UrlData { runner_key: String, } -fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> Result { - let url = url::Url::parse(&format!("ws://{addr}{uri}"))?; - +fn parse_url_from_url(url: url::Url) -> Result { // Read protocol version from query parameters (required) let protocol_version = url .query_pairs() diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index e46cabd423..a0e90767f5 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -9,11 +9,6 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { Service::new("guard", ServiceKind::Standalone, |config, pools| { Box::pin(rivet_guard::start(config, pools)) }), - Service::new( - "pegboard_runner_ws", - ServiceKind::ApiPublic, - |config, pools| Box::pin(pegboard_runner_ws::start(config, pools)), - ), Service::new( "workflow_worker", ServiceKind::Standalone, diff --git a/packages/infra/engine/tests/common/ctx.rs b/packages/infra/engine/tests/common/ctx.rs index b29378bddb..d52b8fb1cc 100644 --- a/packages/infra/engine/tests/common/ctx.rs +++ b/packages/infra/engine/tests/common/ctx.rs @@ -84,11 +84,6 @@ impl TestCtx { Service::new("guard", ServiceKind::Standalone, |config, pools| { Box::pin(rivet_guard::start(config, pools)) }), - Service::new( - "pegboard-runner-ws", - ServiceKind::ApiPublic, - |config, pools| Box::pin(pegboard_runner_ws::start(config, pools)), - ), Service::new( "workflow-worker", ServiceKind::Standalone, @@ -108,7 +103,6 @@ impl TestCtx { tokio::join!( wait_for_port("api-peer", test_deps.api_peer_port()), wait_for_port("guard", test_deps.guard_port()), - wait_for_port("pegboard", test_deps.pegboard_port()), ); // Create workflow context for assertions diff --git a/sdks/typescript/runner-protocol/src/index.ts b/sdks/typescript/runner-protocol/src/index.ts index 84a6c9463d..8c951c3a1f 100644 --- a/sdks/typescript/runner-protocol/src/index.ts +++ b/sdks/typescript/runner-protocol/src/index.ts @@ -26,6 +26,9 @@ export function writeJson(bc: bare.ByteCursor, x: Json): void { bare.writeString(bc, x) } +/** + * Basic types + */ export type KvKey = ArrayBuffer export function readKvKey(bc: bare.ByteCursor): KvKey { @@ -63,6 +66,9 @@ export function writeKvMetadata(bc: bare.ByteCursor, x: KvMetadata): void { bare.writeI64(bc, x.createTs) } +/** + * Query types + */ export type KvListAllQuery = null export type KvListRangeQuery = { @@ -140,20 +146,344 @@ export function writeKvListQuery(bc: bare.ByteCursor, x: KvListQuery): void { } } -export type ActorName = { - readonly metadata: Json +function read0(bc: bare.ByteCursor): readonly KvKey[] { + const len = bare.readUintSafe(bc) + if (len === 0) { + return [] + } + const result = [readKvKey(bc)] + for (let i = 1; i < len; i++) { + result[i] = readKvKey(bc) + } + return result } -export function readActorName(bc: bare.ByteCursor): ActorName { +function write0(bc: bare.ByteCursor, x: readonly KvKey[]): void { + bare.writeUintSafe(bc, x.length) + for (let i = 0; i < x.length; i++) { + writeKvKey(bc, x[i]) + } +} + +/** + * Request types + */ +export type KvGetRequest = { + readonly keys: readonly KvKey[] +} + +export function readKvGetRequest(bc: bare.ByteCursor): KvGetRequest { return { - metadata: readJson(bc), + keys: read0(bc), } } -export function writeActorName(bc: bare.ByteCursor, x: ActorName): void { - writeJson(bc, x.metadata) +export function writeKvGetRequest(bc: bare.ByteCursor, x: KvGetRequest): void { + write0(bc, x.keys) +} + +function read1(bc: bare.ByteCursor): boolean | null { + return bare.readBool(bc) ? bare.readBool(bc) : null +} + +function write1(bc: bare.ByteCursor, x: boolean | null): void { + bare.writeBool(bc, x != null) + if (x != null) { + bare.writeBool(bc, x) + } +} + +function read2(bc: bare.ByteCursor): u64 | null { + return bare.readBool(bc) ? bare.readU64(bc) : null +} + +function write2(bc: bare.ByteCursor, x: u64 | null): void { + bare.writeBool(bc, x != null) + if (x != null) { + bare.writeU64(bc, x) + } +} + +export type KvListRequest = { + readonly query: KvListQuery + readonly reverse: boolean | null + readonly limit: u64 | null +} + +export function readKvListRequest(bc: bare.ByteCursor): KvListRequest { + return { + query: readKvListQuery(bc), + reverse: read1(bc), + limit: read2(bc), + } +} + +export function writeKvListRequest(bc: bare.ByteCursor, x: KvListRequest): void { + writeKvListQuery(bc, x.query) + write1(bc, x.reverse) + write2(bc, x.limit) +} + +function read3(bc: bare.ByteCursor): readonly KvValue[] { + const len = bare.readUintSafe(bc) + if (len === 0) { + return [] + } + const result = [readKvValue(bc)] + for (let i = 1; i < len; i++) { + result[i] = readKvValue(bc) + } + return result +} + +function write3(bc: bare.ByteCursor, x: readonly KvValue[]): void { + bare.writeUintSafe(bc, x.length) + for (let i = 0; i < x.length; i++) { + writeKvValue(bc, x[i]) + } +} + +export type KvPutRequest = { + readonly keys: readonly KvKey[] + readonly values: readonly KvValue[] +} + +export function readKvPutRequest(bc: bare.ByteCursor): KvPutRequest { + return { + keys: read0(bc), + values: read3(bc), + } +} + +export function writeKvPutRequest(bc: bare.ByteCursor, x: KvPutRequest): void { + write0(bc, x.keys) + write3(bc, x.values) +} + +export type KvDeleteRequest = { + readonly keys: readonly KvKey[] +} + +export function readKvDeleteRequest(bc: bare.ByteCursor): KvDeleteRequest { + return { + keys: read0(bc), + } +} + +export function writeKvDeleteRequest(bc: bare.ByteCursor, x: KvDeleteRequest): void { + write0(bc, x.keys) +} + +export type KvDropRequest = null + +/** + * Response types + */ +export type KvErrorResponse = { + readonly message: string +} + +export function readKvErrorResponse(bc: bare.ByteCursor): KvErrorResponse { + return { + message: bare.readString(bc), + } +} + +export function writeKvErrorResponse(bc: bare.ByteCursor, x: KvErrorResponse): void { + bare.writeString(bc, x.message) +} + +function read4(bc: bare.ByteCursor): readonly KvMetadata[] { + const len = bare.readUintSafe(bc) + if (len === 0) { + return [] + } + const result = [readKvMetadata(bc)] + for (let i = 1; i < len; i++) { + result[i] = readKvMetadata(bc) + } + return result +} + +function write4(bc: bare.ByteCursor, x: readonly KvMetadata[]): void { + bare.writeUintSafe(bc, x.length) + for (let i = 0; i < x.length; i++) { + writeKvMetadata(bc, x[i]) + } +} + +export type KvGetResponse = { + readonly keys: readonly KvKey[] + readonly values: readonly KvValue[] + readonly metadata: readonly KvMetadata[] +} + +export function readKvGetResponse(bc: bare.ByteCursor): KvGetResponse { + return { + keys: read0(bc), + values: read3(bc), + metadata: read4(bc), + } +} + +export function writeKvGetResponse(bc: bare.ByteCursor, x: KvGetResponse): void { + write0(bc, x.keys) + write3(bc, x.values) + write4(bc, x.metadata) +} + +export type KvListResponse = { + readonly keys: readonly KvKey[] + readonly values: readonly KvValue[] + readonly metadata: readonly KvMetadata[] +} + +export function readKvListResponse(bc: bare.ByteCursor): KvListResponse { + return { + keys: read0(bc), + values: read3(bc), + metadata: read4(bc), + } +} + +export function writeKvListResponse(bc: bare.ByteCursor, x: KvListResponse): void { + write0(bc, x.keys) + write3(bc, x.values) + write4(bc, x.metadata) +} + +export type KvPutResponse = null + +export type KvDeleteResponse = null + +export type KvDropResponse = null + +/** + * Request/Response unions + */ +export type KvRequestData = + | { readonly tag: "KvGetRequest"; readonly val: KvGetRequest } + | { readonly tag: "KvListRequest"; readonly val: KvListRequest } + | { readonly tag: "KvPutRequest"; readonly val: KvPutRequest } + | { readonly tag: "KvDeleteRequest"; readonly val: KvDeleteRequest } + | { readonly tag: "KvDropRequest"; readonly val: KvDropRequest } + +export function readKvRequestData(bc: bare.ByteCursor): KvRequestData { + const offset = bc.offset + const tag = bare.readU8(bc) + switch (tag) { + case 0: + return { tag: "KvGetRequest", val: readKvGetRequest(bc) } + case 1: + return { tag: "KvListRequest", val: readKvListRequest(bc) } + case 2: + return { tag: "KvPutRequest", val: readKvPutRequest(bc) } + case 3: + return { tag: "KvDeleteRequest", val: readKvDeleteRequest(bc) } + case 4: + return { tag: "KvDropRequest", val: null } + default: { + bc.offset = offset + throw new bare.BareError(offset, "invalid tag") + } + } +} + +export function writeKvRequestData(bc: bare.ByteCursor, x: KvRequestData): void { + switch (x.tag) { + case "KvGetRequest": { + bare.writeU8(bc, 0) + writeKvGetRequest(bc, x.val) + break + } + case "KvListRequest": { + bare.writeU8(bc, 1) + writeKvListRequest(bc, x.val) + break + } + case "KvPutRequest": { + bare.writeU8(bc, 2) + writeKvPutRequest(bc, x.val) + break + } + case "KvDeleteRequest": { + bare.writeU8(bc, 3) + writeKvDeleteRequest(bc, x.val) + break + } + case "KvDropRequest": { + bare.writeU8(bc, 4) + break + } + } +} + +export type KvResponseData = + | { readonly tag: "KvErrorResponse"; readonly val: KvErrorResponse } + | { readonly tag: "KvGetResponse"; readonly val: KvGetResponse } + | { readonly tag: "KvListResponse"; readonly val: KvListResponse } + | { readonly tag: "KvPutResponse"; readonly val: KvPutResponse } + | { readonly tag: "KvDeleteResponse"; readonly val: KvDeleteResponse } + | { readonly tag: "KvDropResponse"; readonly val: KvDropResponse } + +export function readKvResponseData(bc: bare.ByteCursor): KvResponseData { + const offset = bc.offset + const tag = bare.readU8(bc) + switch (tag) { + case 0: + return { tag: "KvErrorResponse", val: readKvErrorResponse(bc) } + case 1: + return { tag: "KvGetResponse", val: readKvGetResponse(bc) } + case 2: + return { tag: "KvListResponse", val: readKvListResponse(bc) } + case 3: + return { tag: "KvPutResponse", val: null } + case 4: + return { tag: "KvDeleteResponse", val: null } + case 5: + return { tag: "KvDropResponse", val: null } + default: { + bc.offset = offset + throw new bare.BareError(offset, "invalid tag") + } + } } +export function writeKvResponseData(bc: bare.ByteCursor, x: KvResponseData): void { + switch (x.tag) { + case "KvErrorResponse": { + bare.writeU8(bc, 0) + writeKvErrorResponse(bc, x.val) + break + } + case "KvGetResponse": { + bare.writeU8(bc, 1) + writeKvGetResponse(bc, x.val) + break + } + case "KvListResponse": { + bare.writeU8(bc, 2) + writeKvListResponse(bc, x.val) + break + } + case "KvPutResponse": { + bare.writeU8(bc, 3) + break + } + case "KvDeleteResponse": { + bare.writeU8(bc, 4) + break + } + case "KvDropResponse": { + bare.writeU8(bc, 5) + break + } + } +} + +/** + * Core + */ export enum StopCode { Ok = "Ok", Error = "Error", @@ -187,6 +517,68 @@ export function writeStopCode(bc: bare.ByteCursor, x: StopCode): void { } } +export type ActorName = { + readonly metadata: Json +} + +export function readActorName(bc: bare.ByteCursor): ActorName { + return { + metadata: readJson(bc), + } +} + +export function writeActorName(bc: bare.ByteCursor, x: ActorName): void { + writeJson(bc, x.metadata) +} + +function read5(bc: bare.ByteCursor): string | null { + return bare.readBool(bc) ? bare.readString(bc) : null +} + +function write5(bc: bare.ByteCursor, x: string | null): void { + bare.writeBool(bc, x != null) + if (x != null) { + bare.writeString(bc, x) + } +} + +function read6(bc: bare.ByteCursor): ArrayBuffer | null { + return bare.readBool(bc) ? bare.readData(bc) : null +} + +function write6(bc: bare.ByteCursor, x: ArrayBuffer | null): void { + bare.writeBool(bc, x != null) + if (x != null) { + bare.writeData(bc, x) + } +} + +export type ActorConfig = { + readonly name: string + readonly key: string | null + readonly createTs: i64 + readonly input: ArrayBuffer | null +} + +export function readActorConfig(bc: bare.ByteCursor): ActorConfig { + return { + name: bare.readString(bc), + key: read5(bc), + createTs: bare.readI64(bc), + input: read6(bc), + } +} + +export function writeActorConfig(bc: bare.ByteCursor, x: ActorConfig): void { + bare.writeString(bc, x.name) + write5(bc, x.key) + bare.writeI64(bc, x.createTs) + write6(bc, x.input) +} + +/** + * Intent + */ export type ActorIntentSleep = null export type ActorIntentStop = null @@ -223,19 +615,11 @@ export function writeActorIntent(bc: bare.ByteCursor, x: ActorIntent): void { } } +/** + * State + */ export type ActorStateRunning = null -function read0(bc: bare.ByteCursor): string | null { - return bare.readBool(bc) ? bare.readString(bc) : null -} - -function write0(bc: bare.ByteCursor, x: string | null): void { - bare.writeBool(bc, x != null) - if (x != null) { - bare.writeString(bc, x) - } -} - export type ActorStateStopped = { readonly code: StopCode readonly message: string | null @@ -244,13 +628,13 @@ export type ActorStateStopped = { export function readActorStateStopped(bc: bare.ByteCursor): ActorStateStopped { return { code: readStopCode(bc), - message: read0(bc), + message: read5(bc), } } export function writeActorStateStopped(bc: bare.ByteCursor, x: ActorStateStopped): void { writeStopCode(bc, x.code) - write0(bc, x.message) + write5(bc, x.message) } export type ActorState = @@ -286,6 +670,9 @@ export function writeActorState(bc: bare.ByteCursor, x: ActorState): void { } } +/** + * MARK: Events + */ export type EventActorIntent = { readonly actorId: Id readonly generation: u32 @@ -326,11 +713,11 @@ export function writeEventActorStateUpdate(bc: bare.ByteCursor, x: EventActorSta writeActorState(bc, x.state) } -function read1(bc: bare.ByteCursor): i64 | null { +function read7(bc: bare.ByteCursor): i64 | null { return bare.readBool(bc) ? bare.readI64(bc) : null } -function write1(bc: bare.ByteCursor, x: i64 | null): void { +function write7(bc: bare.ByteCursor, x: i64 | null): void { bare.writeBool(bc, x != null) if (x != null) { bare.writeI64(bc, x) @@ -347,14 +734,14 @@ export function readEventActorSetAlarm(bc: bare.ByteCursor): EventActorSetAlarm return { actorId: readId(bc), generation: bare.readU32(bc), - alarmTs: read1(bc), + alarmTs: read7(bc), } } export function writeEventActorSetAlarm(bc: bare.ByteCursor, x: EventActorSetAlarm): void { writeId(bc, x.actorId) bare.writeU32(bc, x.generation) - write1(bc, x.alarmTs) + write7(bc, x.alarmTs) } export type Event = @@ -416,40 +803,9 @@ export function writeEventWrapper(bc: bare.ByteCursor, x: EventWrapper): void { writeEvent(bc, x.inner) } -function read2(bc: bare.ByteCursor): ArrayBuffer | null { - return bare.readBool(bc) ? bare.readData(bc) : null -} - -function write2(bc: bare.ByteCursor, x: ArrayBuffer | null): void { - bare.writeBool(bc, x != null) - if (x != null) { - bare.writeData(bc, x) - } -} - -export type ActorConfig = { - readonly name: string - readonly key: string | null - readonly createTs: i64 - readonly input: ArrayBuffer | null -} - -export function readActorConfig(bc: bare.ByteCursor): ActorConfig { - return { - name: bare.readString(bc), - key: read0(bc), - createTs: bare.readI64(bc), - input: read2(bc), - } -} - -export function writeActorConfig(bc: bare.ByteCursor, x: ActorConfig): void { - bare.writeString(bc, x.name) - write0(bc, x.key) - bare.writeI64(bc, x.createTs) - write2(bc, x.input) -} - +/** + * MARK: Commands + */ export type CommandStartActor = { readonly actorId: Id readonly generation: u32 @@ -538,7 +894,7 @@ export function writeCommandWrapper(bc: bare.ByteCursor, x: CommandWrapper): voi writeCommand(bc, x.inner) } -function read3(bc: bare.ByteCursor): ReadonlyMap { +function read8(bc: bare.ByteCursor): ReadonlyMap { const len = bare.readUintSafe(bc) const result = new Map() for (let i = 0; i < len; i++) { @@ -553,298 +909,114 @@ function read3(bc: bare.ByteCursor): ReadonlyMap { return result } -function write3(bc: bare.ByteCursor, x: ReadonlyMap): void { +function write8(bc: bare.ByteCursor, x: ReadonlyMap): void { bare.writeUintSafe(bc, x.size) for (const kv of x) { bare.writeString(bc, kv[0]) writeActorName(bc, kv[1]) - } -} - -function read4(bc: bare.ByteCursor): ReadonlyMap | null { - return bare.readBool(bc) ? read3(bc) : null -} - -function write4(bc: bare.ByteCursor, x: ReadonlyMap | null): void { - bare.writeBool(bc, x != null) - if (x != null) { - write3(bc, x) - } -} - -function read5(bc: bare.ByteCursor): Json | null { - return bare.readBool(bc) ? readJson(bc) : null -} - -function write5(bc: bare.ByteCursor, x: Json | null): void { - bare.writeBool(bc, x != null) - if (x != null) { - writeJson(bc, x) - } -} - -export type ToServerInit = { - readonly name: string - readonly version: u32 - readonly totalSlots: u32 - readonly lastCommandIdx: i64 | null - readonly prepopulateActorNames: ReadonlyMap | null - readonly metadata: Json | null -} - -export function readToServerInit(bc: bare.ByteCursor): ToServerInit { - return { - name: bare.readString(bc), - version: bare.readU32(bc), - totalSlots: bare.readU32(bc), - lastCommandIdx: read1(bc), - prepopulateActorNames: read4(bc), - metadata: read5(bc), - } -} - -export function writeToServerInit(bc: bare.ByteCursor, x: ToServerInit): void { - bare.writeString(bc, x.name) - bare.writeU32(bc, x.version) - bare.writeU32(bc, x.totalSlots) - write1(bc, x.lastCommandIdx) - write4(bc, x.prepopulateActorNames) - write5(bc, x.metadata) -} - -export type ToServerEvents = readonly EventWrapper[] - -export function readToServerEvents(bc: bare.ByteCursor): ToServerEvents { - const len = bare.readUintSafe(bc) - if (len === 0) { - return [] - } - const result = [readEventWrapper(bc)] - for (let i = 1; i < len; i++) { - result[i] = readEventWrapper(bc) - } - return result -} - -export function writeToServerEvents(bc: bare.ByteCursor, x: ToServerEvents): void { - bare.writeUintSafe(bc, x.length) - for (let i = 0; i < x.length; i++) { - writeEventWrapper(bc, x[i]) - } -} - -export type ToServerAckCommands = { - readonly lastCommandIdx: i64 -} - -export function readToServerAckCommands(bc: bare.ByteCursor): ToServerAckCommands { - return { - lastCommandIdx: bare.readI64(bc), - } -} - -export function writeToServerAckCommands(bc: bare.ByteCursor, x: ToServerAckCommands): void { - bare.writeI64(bc, x.lastCommandIdx) -} - -export type ToServerStopping = null - -export type ToServerPing = { - readonly ts: i64 -} - -export function readToServerPing(bc: bare.ByteCursor): ToServerPing { - return { - ts: bare.readI64(bc), - } -} - -export function writeToServerPing(bc: bare.ByteCursor, x: ToServerPing): void { - bare.writeI64(bc, x.ts) -} - -function read6(bc: bare.ByteCursor): readonly KvKey[] { - const len = bare.readUintSafe(bc) - if (len === 0) { - return [] - } - const result = [readKvKey(bc)] - for (let i = 1; i < len; i++) { - result[i] = readKvKey(bc) - } - return result -} - -function write6(bc: bare.ByteCursor, x: readonly KvKey[]): void { - bare.writeUintSafe(bc, x.length) - for (let i = 0; i < x.length; i++) { - writeKvKey(bc, x[i]) - } -} - -export type KvGetRequest = { - readonly keys: readonly KvKey[] -} - -export function readKvGetRequest(bc: bare.ByteCursor): KvGetRequest { - return { - keys: read6(bc), - } -} - -export function writeKvGetRequest(bc: bare.ByteCursor, x: KvGetRequest): void { - write6(bc, x.keys) + } } -function read7(bc: bare.ByteCursor): boolean | null { - return bare.readBool(bc) ? bare.readBool(bc) : null +function read9(bc: bare.ByteCursor): ReadonlyMap | null { + return bare.readBool(bc) ? read8(bc) : null } -function write7(bc: bare.ByteCursor, x: boolean | null): void { +function write9(bc: bare.ByteCursor, x: ReadonlyMap | null): void { bare.writeBool(bc, x != null) if (x != null) { - bare.writeBool(bc, x) + write8(bc, x) } } -function read8(bc: bare.ByteCursor): u64 | null { - return bare.readBool(bc) ? bare.readU64(bc) : null +function read10(bc: bare.ByteCursor): Json | null { + return bare.readBool(bc) ? readJson(bc) : null } -function write8(bc: bare.ByteCursor, x: u64 | null): void { +function write10(bc: bare.ByteCursor, x: Json | null): void { bare.writeBool(bc, x != null) if (x != null) { - bare.writeU64(bc, x) + writeJson(bc, x) } } -export type KvListRequest = { - readonly query: KvListQuery - readonly reverse: boolean | null - readonly limit: u64 | null +export type ToServerInit = { + readonly name: string + readonly version: u32 + readonly totalSlots: u32 + readonly lastCommandIdx: i64 | null + readonly prepopulateActorNames: ReadonlyMap | null + readonly metadata: Json | null } -export function readKvListRequest(bc: bare.ByteCursor): KvListRequest { +export function readToServerInit(bc: bare.ByteCursor): ToServerInit { return { - query: readKvListQuery(bc), - reverse: read7(bc), - limit: read8(bc), + name: bare.readString(bc), + version: bare.readU32(bc), + totalSlots: bare.readU32(bc), + lastCommandIdx: read7(bc), + prepopulateActorNames: read9(bc), + metadata: read10(bc), } } -export function writeKvListRequest(bc: bare.ByteCursor, x: KvListRequest): void { - writeKvListQuery(bc, x.query) - write7(bc, x.reverse) - write8(bc, x.limit) +export function writeToServerInit(bc: bare.ByteCursor, x: ToServerInit): void { + bare.writeString(bc, x.name) + bare.writeU32(bc, x.version) + bare.writeU32(bc, x.totalSlots) + write7(bc, x.lastCommandIdx) + write9(bc, x.prepopulateActorNames) + write10(bc, x.metadata) } -function read9(bc: bare.ByteCursor): readonly KvValue[] { +export type ToServerEvents = readonly EventWrapper[] + +export function readToServerEvents(bc: bare.ByteCursor): ToServerEvents { const len = bare.readUintSafe(bc) if (len === 0) { return [] } - const result = [readKvValue(bc)] + const result = [readEventWrapper(bc)] for (let i = 1; i < len; i++) { - result[i] = readKvValue(bc) + result[i] = readEventWrapper(bc) } return result } -function write9(bc: bare.ByteCursor, x: readonly KvValue[]): void { +export function writeToServerEvents(bc: bare.ByteCursor, x: ToServerEvents): void { bare.writeUintSafe(bc, x.length) for (let i = 0; i < x.length; i++) { - writeKvValue(bc, x[i]) + writeEventWrapper(bc, x[i]) } } -export type KvPutRequest = { - readonly keys: readonly KvKey[] - readonly values: readonly KvValue[] +export type ToServerAckCommands = { + readonly lastCommandIdx: i64 } -export function readKvPutRequest(bc: bare.ByteCursor): KvPutRequest { +export function readToServerAckCommands(bc: bare.ByteCursor): ToServerAckCommands { return { - keys: read6(bc), - values: read9(bc), + lastCommandIdx: bare.readI64(bc), } } -export function writeKvPutRequest(bc: bare.ByteCursor, x: KvPutRequest): void { - write6(bc, x.keys) - write9(bc, x.values) -} - -export type KvDeleteRequest = { - readonly keys: readonly KvKey[] +export function writeToServerAckCommands(bc: bare.ByteCursor, x: ToServerAckCommands): void { + bare.writeI64(bc, x.lastCommandIdx) } -export function readKvDeleteRequest(bc: bare.ByteCursor): KvDeleteRequest { - return { - keys: read6(bc), - } -} +export type ToServerStopping = null -export function writeKvDeleteRequest(bc: bare.ByteCursor, x: KvDeleteRequest): void { - write6(bc, x.keys) +export type ToServerPing = { + readonly ts: i64 } -export type KvDropRequest = null - -export type KvRequestData = - | { readonly tag: "KvGetRequest"; readonly val: KvGetRequest } - | { readonly tag: "KvListRequest"; readonly val: KvListRequest } - | { readonly tag: "KvPutRequest"; readonly val: KvPutRequest } - | { readonly tag: "KvDeleteRequest"; readonly val: KvDeleteRequest } - | { readonly tag: "KvDropRequest"; readonly val: KvDropRequest } - -export function readKvRequestData(bc: bare.ByteCursor): KvRequestData { - const offset = bc.offset - const tag = bare.readU8(bc) - switch (tag) { - case 0: - return { tag: "KvGetRequest", val: readKvGetRequest(bc) } - case 1: - return { tag: "KvListRequest", val: readKvListRequest(bc) } - case 2: - return { tag: "KvPutRequest", val: readKvPutRequest(bc) } - case 3: - return { tag: "KvDeleteRequest", val: readKvDeleteRequest(bc) } - case 4: - return { tag: "KvDropRequest", val: null } - default: { - bc.offset = offset - throw new bare.BareError(offset, "invalid tag") - } +export function readToServerPing(bc: bare.ByteCursor): ToServerPing { + return { + ts: bare.readI64(bc), } } -export function writeKvRequestData(bc: bare.ByteCursor, x: KvRequestData): void { - switch (x.tag) { - case "KvGetRequest": { - bare.writeU8(bc, 0) - writeKvGetRequest(bc, x.val) - break - } - case "KvListRequest": { - bare.writeU8(bc, 1) - writeKvListRequest(bc, x.val) - break - } - case "KvPutRequest": { - bare.writeU8(bc, 2) - writeKvPutRequest(bc, x.val) - break - } - case "KvDeleteRequest": { - bare.writeU8(bc, 3) - writeKvDeleteRequest(bc, x.val) - break - } - case "KvDropRequest": { - bare.writeU8(bc, 4) - break - } - } +export function writeToServerPing(bc: bare.ByteCursor, x: ToServerPing): void { + bare.writeI64(bc, x.ts) } export type ToServerKvRequest = { @@ -951,6 +1123,9 @@ export function decodeToServer(bytes: Uint8Array): ToServer { return result } +/** + * MARK: To Client + */ export type ProtocolMetadata = { readonly runnerLostThreshold: i64 } @@ -1020,148 +1195,6 @@ export function writeToClientAckEvents(bc: bare.ByteCursor, x: ToClientAckEvents bare.writeI64(bc, x.lastEventIdx) } -export type KvErrorResponse = { - readonly message: string -} - -export function readKvErrorResponse(bc: bare.ByteCursor): KvErrorResponse { - return { - message: bare.readString(bc), - } -} - -export function writeKvErrorResponse(bc: bare.ByteCursor, x: KvErrorResponse): void { - bare.writeString(bc, x.message) -} - -function read10(bc: bare.ByteCursor): readonly KvMetadata[] { - const len = bare.readUintSafe(bc) - if (len === 0) { - return [] - } - const result = [readKvMetadata(bc)] - for (let i = 1; i < len; i++) { - result[i] = readKvMetadata(bc) - } - return result -} - -function write10(bc: bare.ByteCursor, x: readonly KvMetadata[]): void { - bare.writeUintSafe(bc, x.length) - for (let i = 0; i < x.length; i++) { - writeKvMetadata(bc, x[i]) - } -} - -export type KvGetResponse = { - readonly keys: readonly KvKey[] - readonly values: readonly KvValue[] - readonly metadata: readonly KvMetadata[] -} - -export function readKvGetResponse(bc: bare.ByteCursor): KvGetResponse { - return { - keys: read6(bc), - values: read9(bc), - metadata: read10(bc), - } -} - -export function writeKvGetResponse(bc: bare.ByteCursor, x: KvGetResponse): void { - write6(bc, x.keys) - write9(bc, x.values) - write10(bc, x.metadata) -} - -export type KvListResponse = { - readonly keys: readonly KvKey[] - readonly values: readonly KvValue[] - readonly metadata: readonly KvMetadata[] -} - -export function readKvListResponse(bc: bare.ByteCursor): KvListResponse { - return { - keys: read6(bc), - values: read9(bc), - metadata: read10(bc), - } -} - -export function writeKvListResponse(bc: bare.ByteCursor, x: KvListResponse): void { - write6(bc, x.keys) - write9(bc, x.values) - write10(bc, x.metadata) -} - -export type KvPutResponse = null - -export type KvDeleteResponse = null - -export type KvDropResponse = null - -export type KvResponseData = - | { readonly tag: "KvErrorResponse"; readonly val: KvErrorResponse } - | { readonly tag: "KvGetResponse"; readonly val: KvGetResponse } - | { readonly tag: "KvListResponse"; readonly val: KvListResponse } - | { readonly tag: "KvPutResponse"; readonly val: KvPutResponse } - | { readonly tag: "KvDeleteResponse"; readonly val: KvDeleteResponse } - | { readonly tag: "KvDropResponse"; readonly val: KvDropResponse } - -export function readKvResponseData(bc: bare.ByteCursor): KvResponseData { - const offset = bc.offset - const tag = bare.readU8(bc) - switch (tag) { - case 0: - return { tag: "KvErrorResponse", val: readKvErrorResponse(bc) } - case 1: - return { tag: "KvGetResponse", val: readKvGetResponse(bc) } - case 2: - return { tag: "KvListResponse", val: readKvListResponse(bc) } - case 3: - return { tag: "KvPutResponse", val: null } - case 4: - return { tag: "KvDeleteResponse", val: null } - case 5: - return { tag: "KvDropResponse", val: null } - default: { - bc.offset = offset - throw new bare.BareError(offset, "invalid tag") - } - } -} - -export function writeKvResponseData(bc: bare.ByteCursor, x: KvResponseData): void { - switch (x.tag) { - case "KvErrorResponse": { - bare.writeU8(bc, 0) - writeKvErrorResponse(bc, x.val) - break - } - case "KvGetResponse": { - bare.writeU8(bc, 1) - writeKvGetResponse(bc, x.val) - break - } - case "KvListResponse": { - bare.writeU8(bc, 2) - writeKvListResponse(bc, x.val) - break - } - case "KvPutResponse": { - bare.writeU8(bc, 3) - break - } - case "KvDeleteResponse": { - bare.writeU8(bc, 4) - break - } - case "KvDropResponse": { - bare.writeU8(bc, 5) - break - } - } -} - export type ToClientKvResponse = { readonly requestId: u32 readonly data: KvResponseData