diff --git a/Cargo.lock b/Cargo.lock index 44bd7cd6e0..50db0f0fd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -377,6 +377,31 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45bf463831f5131b7d3c756525b305d40f1185b688565648a92e1392ca35713d" +dependencies = [ + "axum 0.8.4", + "axum-core 0.5.2", + "bytes", + "form_urlencoded", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "serde", + "serde_html_form", + "serde_path_to_error", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-test" version = "17.3.0" @@ -1454,6 +1479,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -2737,6 +2773,7 @@ dependencies = [ "gasoline", "rivet-api-builder", "rivet-api-util", + "rivet-data", "rivet-error", "rivet-util", "serde", @@ -2744,6 +2781,7 @@ dependencies = [ "udb-util", "universaldb", "utoipa", + "versioned-data-util", ] [[package]] @@ -3217,8 +3255,8 @@ dependencies = [ "rivet-api-client", "rivet-api-types", "rivet-api-util", + "rivet-data", "rivet-error", - "rivet-key-data", "rivet-metrics", "rivet-runner-protocol", "rivet-types", @@ -3960,16 +3998,34 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.26.2", + "tokio-util", "tower 0.5.2", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 1.0.2", ] +[[package]] +name = "reqwest-eventsource" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632c55746dbb44275691640e7b40c907c16a2dc1a5842aa98aaec90da6ec6bde" +dependencies = [ + "eventsource-stream", + "futures-core", + "futures-timer", + "mime", + "nom", + "pin-project-lite", + "reqwest", + "thiserror 1.0.69", +] + [[package]] name = "reserve-port" version = "2.3.0" @@ -3999,6 +4055,7 @@ version = "0.0.1" dependencies = [ "anyhow", "axum 0.8.4", + "axum-extra", "axum-test", "chrono", "gasoline", @@ -4204,6 +4261,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "rivet-data" +version = "0.0.1" +dependencies = [ + "anyhow", + "bare_gen", + "gasoline", + "indoc", + "prettyplease", + "rivet-runner-protocol", + "rivet-util", + "serde", + "serde_bare", + "serde_json", + "syn 2.0.104", + "versioned-data-util", +] + [[package]] name = "rivet-dump-openapi" version = "0.0.1" @@ -4231,6 +4306,7 @@ dependencies = [ "lz4_flex", "namespace", "pegboard", + "pegboard-outbound", "pegboard-runner-ws", "portpicker", "rand 0.8.5", @@ -4327,9 +4403,9 @@ dependencies = [ "rivet-api-public", "rivet-cache", "rivet-config", + "rivet-data", "rivet-error", "rivet-guard-core", - "rivet-key-data", "rivet-logs", "rivet-metrics", "rivet-pools", @@ -4392,24 +4468,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "rivet-key-data" -version = "0.0.1" -dependencies = [ - "anyhow", - "bare_gen", - "gasoline", - "indoc", - "prettyplease", - "rivet-runner-protocol", - "rivet-util", - "serde", - "serde_bare", - "serde_json", - "syn 2.0.104", - "versioned-data-util", -] - [[package]] name = "rivet-logs" version = "0.0.1" @@ -4615,7 +4673,7 @@ dependencies = [ "anyhow", "gasoline", "rivet-api-builder", - "rivet-key-data", + "rivet-data", "rivet-runner-protocol", "rivet-util", "serde", @@ -5215,6 +5273,19 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "serde_html_form" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4" +dependencies = [ + "form_urlencoded", + "indexmap 2.10.0", + "itoa 1.0.15", + "ryu", + "serde", +] + [[package]] name = "serde_json" version = "1.0.141" @@ -6618,6 +6689,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.77" diff --git a/Cargo.toml b/Cargo.toml index 321b87c12b..284892b56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/udb-util","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-runner-ws","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/epoxy-protocol","sdks/rust/key-data","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"] +members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/udb-util","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-outbound","packages/core/pegboard-runner-ws","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"] [workspace.package] version = "25.6.1" @@ -79,6 +79,7 @@ tracing-core = "0.1" tracing-opentelemetry = "0.29" tracing-slog = "0.2" vergen = "9.0.4" +reqwest-eventsource = "0.6.0" [workspace.dependencies.sentry] version = "0.37.0" @@ -118,6 +119,10 @@ features = ["uuid"] version = "0.8" features = ["http2"] +[workspace.dependencies.axum-extra] +version = "0.10.1" +features = ["query"] + [workspace.dependencies.tower-http] version = "0.6" features = ["cors","trace"] @@ -359,6 +364,9 @@ path = "packages/core/guard/server" [workspace.dependencies.pegboard-gateway] path = "packages/core/pegboard-gateway" +[workspace.dependencies.pegboard-outbound] +path = "packages/core/pegboard-outbound" + [workspace.dependencies.pegboard-runner-ws] path = "packages/core/pegboard-runner-ws" @@ -386,12 +394,12 @@ path = "sdks/rust/api-full" [workspace.dependencies.bare_gen] path = "sdks/rust/bare_gen" +[workspace.dependencies.rivet-data] +path = "sdks/rust/data" + [workspace.dependencies.epoxy-protocol] path = "sdks/rust/epoxy-protocol" -[workspace.dependencies.rivet-key-data] -path = "sdks/rust/key-data" - [workspace.dependencies.rivet-runner-protocol] path = "sdks/rust/runner-protocol" diff --git a/docker/dev/docker-compose.yml b/docker/dev/docker-compose.yml index 6803272ebe..64de5f178d 100644 --- a/docker/dev/docker-compose.yml +++ b/docker/dev/docker-compose.yml @@ -187,6 +187,7 @@ services: environment: - RIVET_ENDPOINT=http://rivet-engine:6420 - RUNNER_HOST=runner + # - NO_AUTOSTART=1 stop_grace_period: 4s ports: - '5050:5050' diff --git a/out/openapi.json b/out/openapi.json index 87571399cc..ff97997a91 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -465,6 +465,17 @@ "schema": { "type": "string" } + }, + { + "name": "namespace_id", + "in": "query", + "required": true, + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/RivetId" + } + } } ], "responses": { @@ -1042,7 +1053,8 @@ "namespace_id", "name", "display_name", - "create_ts" + "create_ts", + "runner_kind" ], "properties": { "create_ts": { @@ -1057,6 +1069,9 @@ }, "namespace_id": { "$ref": "#/components/schemas/RivetId" + }, + "runner_kind": { + "$ref": "#/components/schemas/RunnerKind" } } }, @@ -1238,6 +1253,59 @@ }, "additionalProperties": false }, + "RunnerKind": { + "oneOf": [ + { + "type": "object", + "required": [ + "outbound" + ], + "properties": { + "outbound": { + "type": "object", + "required": [ + "url", + "slots_per_runner", + "min_runners", + "max_runners", + "runners_margin" + ], + "properties": { + "max_runners": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "min_runners": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "runners_margin": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "slots_per_runner": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "url": { + "type": "string" + } + } + } + } + }, + { + "type": "string", + "enum": [ + "custom" + ] + } + ] + }, "RunnersGetResponse": { "type": "object", "required": [ diff --git a/packages/common/api-builder/Cargo.toml b/packages/common/api-builder/Cargo.toml index f9ce906458..5655703737 100644 --- a/packages/common/api-builder/Cargo.toml +++ b/packages/common/api-builder/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] anyhow.workspace = true axum.workspace = true +axum-extra.workspace = true gas.workspace = true chrono.workspace = true hyper = { workspace = true, features = ["full"] } diff --git a/packages/common/api-builder/src/wrappers.rs b/packages/common/api-builder/src/wrappers.rs index e6d8193c2d..926b6ece48 100644 --- a/packages/common/api-builder/src/wrappers.rs +++ b/packages/common/api-builder/src/wrappers.rs @@ -1,13 +1,14 @@ use anyhow::Result; use axum::{ body::Bytes, - extract::{Extension, Path, Query}, + extract::{Extension, Path}, response::{IntoResponse, Json}, routing::{ delete as axum_delete, get as axum_get, patch as axum_patch, post as axum_post, put as axum_put, }, }; +use axum_extra::extract::Query; use serde::{Serialize, de::DeserializeOwned}; use std::future::Future; diff --git a/packages/common/gasoline/core/src/utils/tags.rs b/packages/common/gasoline/core/src/utils/tags.rs index 967c59d10f..92326b65bd 100644 --- a/packages/common/gasoline/core/src/utils/tags.rs +++ b/packages/common/gasoline/core/src/utils/tags.rs @@ -61,6 +61,16 @@ impl AsTags for serde_json::Value { } } +impl AsTags for () { + fn as_tags(&self) -> WorkflowResult { + Ok(serde_json::Value::Object(serde_json::Map::new())) + } + + fn as_cjson_tags(&self) -> WorkflowResult { + Ok(String::new()) + } +} + impl AsTags for &T { fn as_tags(&self) -> WorkflowResult { (*self).as_tags() diff --git a/packages/common/pools/src/reqwest.rs b/packages/common/pools/src/reqwest.rs index b3044041af..78f1f2e7cb 100644 --- a/packages/common/pools/src/reqwest.rs +++ b/packages/common/pools/src/reqwest.rs @@ -13,3 +13,10 @@ pub async fn client() -> Result { .await .cloned() } + +pub async fn client_no_timeout() -> Result { + CLIENT + .get_or_try_init(|| async { Client::builder().build() }) + .await + .cloned() +} diff --git a/packages/common/types/Cargo.toml b/packages/common/types/Cargo.toml index 6e429cbacf..9ae35c64aa 100644 --- a/packages/common/types/Cargo.toml +++ b/packages/common/types/Cargo.toml @@ -10,7 +10,7 @@ anyhow.workspace = true gas.workspace = true rivet-api-builder.workspace = true rivet-runner-protocol.workspace = true -rivet-key-data.workspace = true +rivet-data.workspace = true rivet-util.workspace = true serde.workspace = true utoipa.workspace = true diff --git a/packages/common/types/src/runners.rs b/packages/common/types/src/runners.rs index f56db3f670..d70ed49eda 100644 --- a/packages/common/types/src/runners.rs +++ b/packages/common/types/src/runners.rs @@ -1,5 +1,5 @@ use gas::prelude::*; -use rivet_key_data::generated::pegboard_runner_address_v1; +use rivet_data::generated::pegboard_runner_address_v1; use rivet_runner_protocol::protocol; use rivet_util::Id; use serde::{Deserialize, Serialize}; diff --git a/packages/common/udb-util/src/keys.rs b/packages/common/udb-util/src/keys.rs index 397c727c28..c0406fe67f 100644 --- a/packages/common/udb-util/src/keys.rs +++ b/packages/common/udb-util/src/keys.rs @@ -59,7 +59,7 @@ define_keys! { (31, DBS, "dbs"), (32, ACTOR, "actor"), (33, BY_NAME, "by_name"), - (34, DATACENTER, "datacenter"), + // 34 (35, REMAINING_MEMORY, "remaining_memory"), (36, REMAINING_CPU, "remaining_cpu"), (37, TOTAL_MEMORY, "total_memory"), @@ -119,4 +119,6 @@ define_keys! { (91, METRIC, "metric"), (92, CURRENT_BALLOT, "current_ballot"), (93, INSTANCE_BALLOT, "instance_ballot"), + (94, OUTBOUND, "outbound"), + (95, DESIRED_SLOTS, "desired_slots"), } diff --git a/packages/core/api-peer/src/namespaces.rs b/packages/core/api-peer/src/namespaces.rs index 196764252f..36b0a3c9fc 100644 --- a/packages/core/api-peer/src/namespaces.rs +++ b/packages/core/api-peer/src/namespaces.rs @@ -73,6 +73,7 @@ pub struct ListQuery { pub limit: Option, pub cursor: Option, pub name: Option, + pub namespace_id: Vec, } #[derive(Serialize, Deserialize, ToSchema)] @@ -85,7 +86,7 @@ pub struct ListResponse { #[utoipa::path( get, - operation_id = "actors_list", + operation_id = "namespaces_list", path = "/namespaces", params(ListQuery), responses( @@ -105,6 +106,17 @@ pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result, + shutdown_tx: oneshot::Sender<()>, + draining: Arc, +} + +#[tracing::instrument(skip_all)] +pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { + let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; + let ctx = StandaloneCtx::new( + db::DatabaseKv::from_pools(pools.clone()).await?, + config.clone(), + pools, + cache, + "pegboard-outbound", + Id::new_v1(config.dc_label()), + Id::new_v1(config.dc_label()), + )?; + + let mut sub = ctx + .subscribe::(()) + .await?; + let mut outbound_connections = HashMap::new(); + + loop { + tick(&ctx, &mut outbound_connections).await?; + + sub.next().await?; + } +} + +async fn tick( + ctx: &StandaloneCtx, + outbound_connections: &mut HashMap<(Id, String), Vec>, +) -> Result<()> { + let outbound_data = ctx + .udb()? + .run(|tx, _mc| async move { + let txs = tx.subspace(keys::subspace()); + let outbound_desired_subspace = + txs.subspace(&keys::ns::OutboundDesiredSlotsKey::subspace()); + + txs.get_ranges_keyvalues( + udb::RangeOption { + mode: StreamingMode::WantAll, + ..(&outbound_desired_subspace).into() + }, + // NOTE: This is a snapshot to prevent conflict with updates to this subspace + SNAPSHOT, + ) + .map(|res| match res { + Ok(entry) => { + let (key, desired_slots) = + txs.read_entry::(&entry)?; + + Ok((key.namespace_id, key.runner_name_selector, desired_slots)) + } + Err(err) => Err(err.into()), + }) + .try_collect::>() + .await + + // outbound/{ns_id}/{runner_name_selector}/desired_slots + }) + .await?; + + let mut namespace_ids = outbound_data + .iter() + .map(|(ns_id, _, _)| *ns_id) + .collect::>(); + namespace_ids.dedup(); + + let namespaces = ctx + .op(namespace::ops::get_global::Input { namespace_ids }) + .await?; + + for (ns_id, runner_name_selector, desired_slots) in &outbound_data { + let namespace = namespaces + .iter() + .find(|ns| ns.namespace_id == *ns_id) + .context("ns not found")?; + + let RunnerKind::Outbound { + url, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } = &namespace.runner_kind + else { + tracing::warn!( + ?ns_id, + "this namespace should not be in the outbound subspace (wrong runner kind)" + ); + continue; + }; + + let curr = outbound_connections + .entry((*ns_id, runner_name_selector.clone())) + .or_insert_with(Vec::new); + + // Remove finished and draining connections from list + curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); + + let desired_count = (desired_slots + .div_ceil(*slots_per_runner) + .max(*min_runners) + .min(*max_runners) + + runners_margin) + .try_into()?; + + // Calculate diff + let drain_count = curr.len().saturating_sub(desired_count); + let start_count = desired_count.saturating_sub(curr.len()); + + if drain_count != 0 { + // TODO: Implement smart logic of draining runners with the lowest allocated actors + let draining_connections = curr.split_off(desired_count); + + for conn in draining_connections { + if conn.shutdown_tx.send(()).is_err() { + tracing::warn!( + "outbound connection shutdown channel dropped, likely already stopped" + ); + } + } + } + + let starting_connections = + std::iter::repeat_with(|| spawn_connection(ctx.clone(), url.clone())).take(start_count); + curr.extend(starting_connections); + } + + // Remove entries that aren't returned from udb + outbound_connections.retain(|(ns_id, runner_name_selector), _| { + outbound_data + .iter() + .any(|(ns_id2, runner_name_selector2, _)| { + ns_id == ns_id2 && runner_name_selector == runner_name_selector2 + }) + }); + + Ok(()) +} + +fn spawn_connection(ctx: StandaloneCtx, url: String) -> OutboundConnection { + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let draining = Arc::new(AtomicBool::new(false)); + + let draining2 = draining.clone(); + let handle = tokio::spawn(async move { + if let Err(err) = outbound_handler(&ctx, url, shutdown_rx, draining2).await { + tracing::error!(?err, "outbound req failed"); + + // TODO: Add backoff + tokio::time::sleep(Duration::from_secs(1)).await; + + // On error, bump the autoscaler loop again + let _ = ctx + .msg(pegboard::messages::BumpOutboundAutoscaler {}) + .send() + .await; + } + }); + + OutboundConnection { + handle, + shutdown_tx, + draining, + } +} + +async fn outbound_handler( + ctx: &StandaloneCtx, + url: String, + shutdown_rx: oneshot::Receiver<()>, + draining: Arc, +) -> Result<()> { + let client = rivet_pools::reqwest::client_no_timeout().await?; + let mut es = sse::EventSource::new(client.get(url))?; + let mut runner_id = None; + + let stream_handler = async { + while let Some(event) = es.next().await { + match event { + Ok(sse::Event::Open) => {} + Ok(sse::Event::Message(msg)) => { + tracing::debug!(%msg.data, "received outbound req message"); + + if runner_id.is_none() { + runner_id = Some(Id::parse(&msg.data)?); + } + } + Err(sse::Error::StreamEnded) => { + tracing::debug!("outbound req stopped early"); + + return Ok(()); + } + Err(err) => return Err(err.into()), + } + } + + anyhow::Ok(()) + }; + + tokio::select! { + res = stream_handler => return res.map_err(Into::into), + _ = tokio::time::sleep(OUTBOUND_REQUEST_LIFESPAN) => {} + _ = shutdown_rx => {} + } + + draining.store(true, Ordering::SeqCst); + + ctx.msg(pegboard::messages::BumpOutboundAutoscaler {}) + .send() + .await?; + + if let Some(runner_id) = runner_id { + stop_runner(ctx, runner_id).await?; + } + + // Continue waiting on req while draining + while let Some(event) = es.next().await { + match event { + Ok(sse::Event::Open) => {} + Ok(sse::Event::Message(msg)) => { + tracing::debug!(%msg.data, "received outbound req message"); + + // If runner_id is none at this point it means we did not send the stopping signal yet, so + // send it now + if runner_id.is_none() { + stop_runner(ctx, Id::parse(&msg.data)?).await?; + } + } + Err(sse::Error::StreamEnded) => break, + Err(err) => return Err(err.into()), + } + } + + tracing::info!("outbound req stopped"); + + Ok(()) +} + +async fn stop_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { + let res = ctx + .signal(protocol::ToServer::Stopping) + .to_workflow::() + .tag("runner_id", runner_id) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + ?runner_id, + "runner workflow not found, likely already stopped" + ); + } else { + res?; + } + + Ok(()) +} diff --git a/packages/infra/engine/Cargo.toml b/packages/infra/engine/Cargo.toml index 2556c213f3..975ea3d788 100644 --- a/packages/infra/engine/Cargo.toml +++ b/packages/infra/engine/Cargo.toml @@ -11,16 +11,15 @@ path = "src/main.rs" [dependencies] anyhow.workspace = true -gas.workspace = true chrono.workspace = true clap.workspace = true colored_json.workspace = true -udb-util.workspace = true -universaldb.workspace = true futures-util.workspace = true +gas.workspace = true hex.workspace = true include_dir.workspace = true lz4_flex.workspace = true +pegboard-outbound.workspace = true pegboard-runner-ws.workspace = true reqwest.workspace = true rivet-api-peer.workspace = true @@ -38,15 +37,17 @@ rivet-term.workspace = true rivet-util.workspace = true rivet-workflow-worker.workspace = true rustyline.workspace = true -serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true +serde.workspace = true strum.workspace = true tabled.workspace = true tempfile.workspace = true thiserror.workspace = true tokio.workspace = true tracing.workspace = true +udb-util.workspace = true +universaldb.workspace = true url.workspace = true uuid.workspace = true diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index ee683efc15..890b12842b 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -25,6 +25,11 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { Service::new("bootstrap", ServiceKind::Oneshot, |config, pools| { Box::pin(rivet_bootstrap::start(config, pools)) }), + Service::new( + "pegboard_outbound", + ServiceKind::Standalone, + |config, pools| Box::pin(pegboard_outbound::start(config, pools)), + ), ]; Ok(RunConfigData { services }) diff --git a/packages/services/namespace/Cargo.toml b/packages/services/namespace/Cargo.toml index fef88ecd1c..32d46a6735 100644 --- a/packages/services/namespace/Cargo.toml +++ b/packages/services/namespace/Cargo.toml @@ -8,12 +8,14 @@ edition.workspace = true [dependencies] anyhow.workspace = true gas.workspace = true -udb-util.workspace = true -universaldb.workspace = true rivet-api-builder.workspace = true rivet-api-util.workspace = true +rivet-data.workspace = true rivet-error.workspace = true rivet-util.workspace = true serde.workspace = true tracing.workspace = true +udb-util.workspace = true +universaldb.workspace = true utoipa.workspace = true +versioned-data-util.workspace = true diff --git a/packages/services/namespace/src/keys.rs b/packages/services/namespace/src/keys.rs index 3803feb6f6..c4e13afde4 100644 --- a/packages/services/namespace/src/keys.rs +++ b/packages/services/namespace/src/keys.rs @@ -3,6 +3,7 @@ use std::result::Result::Ok; use anyhow::*; use gas::prelude::*; use udb_util::prelude::*; +use versioned_data_util::OwnedVersionedData; pub fn subspace() -> udb_util::Subspace { udb_util::Subspace::new(&(RIVET, NAMESPACE)) @@ -144,6 +145,55 @@ impl<'de> TupleUnpack<'de> for CreateTsKey { } } +#[derive(Debug)] +pub struct RunnerKindKey { + namespace_id: Id, +} + +impl RunnerKindKey { + pub fn new(namespace_id: Id) -> Self { + RunnerKindKey { namespace_id } + } +} + +impl FormalKey for RunnerKindKey { + type Value = crate::types::RunnerKind; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok( + rivet_data::versioned::NamespaceRunnerKind::deserialize_with_embedded_version(raw)? + .into(), + ) + } + + fn serialize(&self, value: Self::Value) -> Result> { + rivet_data::versioned::NamespaceRunnerKind::latest(value.into()) + .serialize_with_embedded_version( + rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, + ) + } +} + +impl TuplePack for RunnerKindKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (DATA, self.namespace_id, CREATE_TS); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for RunnerKindKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, namespace_id, _)) = <(usize, Id, usize)>::unpack(input, tuple_depth)?; + let v = RunnerKindKey { namespace_id }; + + Ok((input, v)) + } +} + #[derive(Debug)] pub struct ByNameKey { name: String, diff --git a/packages/services/namespace/src/ops/get_global.rs b/packages/services/namespace/src/ops/get_global.rs index 5dc5a58b08..a62eeda288 100644 --- a/packages/services/namespace/src/ops/get_global.rs +++ b/packages/services/namespace/src/ops/get_global.rs @@ -4,20 +4,19 @@ use crate::types::Namespace; #[derive(Debug)] pub struct Input { - // TODO: Accept vec - pub namespace_id: Id, + pub namespace_ids: Vec, } #[operation] -pub async fn namespace_get_global(ctx: &OperationCtx, input: &Input) -> Result> { +pub async fn namespace_get_global(ctx: &OperationCtx, input: &Input) -> Result> { if ctx.config().is_leader() { let namespaces_res = ctx .op(crate::ops::get_local::Input { - namespace_ids: vec![input.namespace_id], + namespace_ids: input.namespace_ids.clone(), }) .await?; - Ok(namespaces_res.namespaces.into_iter().next()) + Ok(namespaces_res.namespaces) } else { let leader_dc = ctx.config().leader_dc()?; let client = rivet_pools::reqwest::client().await?; @@ -25,51 +24,42 @@ pub async fn namespace_get_global(ctx: &OperationCtx, input: &Input) -> Result>(), + ) + .send() + .await?; - let res = rivet_api_util::parse_response::(res).await; + let res = rivet_api_util::parse_response::(res).await?; - let res = match res { - Ok(res) => Ok(Some(res.namespace)), - Err(err) => { - // Explicitly handle namespace not found error - if let Some(error) = err.chain().find_map(|x| { - x.downcast_ref::() - }) { - if error.1.group == "namespace" && error.1.code == "not_found" { - Ok(None) - } else { - Err(err) - } - } else { - Err(err) - } - } - }; - - cache.resolve(&key, res?); + for ns in res.namespaces { + let namespace_id = ns.namespace_id; + cache.resolve(&&namespace_id, ns); + } Ok(cache) } } }) .await - .map(|x| x.flatten()) } } // TODO: Cyclical dependency with api_peer #[derive(Deserialize)] -struct GetResponse { - namespace: Namespace, +struct ListResponse { + namespaces: Vec, } diff --git a/packages/services/namespace/src/ops/get_local.rs b/packages/services/namespace/src/ops/get_local.rs index 913579092f..ed6663d589 100644 --- a/packages/services/namespace/src/ops/get_local.rs +++ b/packages/services/namespace/src/ops/get_local.rs @@ -1,6 +1,6 @@ use futures_util::{StreamExt, TryStreamExt}; use gas::prelude::*; -use udb_util::{FormalKey, SERIALIZABLE}; +use udb_util::{SERIALIZABLE, TxnExt}; use universaldb as udb; use crate::{errors, keys, types::Namespace}; @@ -45,39 +45,40 @@ pub(crate) async fn get_inner( namespace_id: Id, tx: &udb::RetryableTransaction, ) -> std::result::Result, udb::FdbBindingError> { + let txs = tx.subspace(keys::subspace()); + let name_key = keys::NameKey::new(namespace_id); let display_name_key = keys::DisplayNameKey::new(namespace_id); let create_ts_key = keys::CreateTsKey::new(namespace_id); + let runner_kind_key = keys::RunnerKindKey::new(namespace_id); - let (name_entry, display_name_entry, create_ts_entry) = tokio::try_join!( - tx.get(&keys::subspace().pack(&name_key), SERIALIZABLE), - tx.get(&keys::subspace().pack(&display_name_key), SERIALIZABLE), - tx.get(&keys::subspace().pack(&create_ts_key), SERIALIZABLE), + let (name, display_name, create_ts, runner_kind) = tokio::try_join!( + txs.read_opt(&name_key, SERIALIZABLE), + txs.read_opt(&display_name_key, SERIALIZABLE), + txs.read_opt(&create_ts_key, SERIALIZABLE), + txs.read_opt(&runner_kind_key, SERIALIZABLE), )?; // Namespace not found - let Some(name_entry) = name_entry else { + let Some(name) = name else { return Ok(None); }; - let name = name_key - .deserialize(&name_entry) - .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?; - let display_name = display_name_key - .deserialize(&display_name_entry.ok_or(udb::FdbBindingError::CustomError( - format!("key should exist: {display_name_key:?}").into(), - ))?) - .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?; - let create_ts = create_ts_key - .deserialize(&create_ts_entry.ok_or(udb::FdbBindingError::CustomError( - format!("key should exist: {create_ts_key:?}").into(), - ))?) - .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?; + let display_name = display_name.ok_or(udb::FdbBindingError::CustomError( + format!("key should exist: {display_name_key:?}").into(), + ))?; + let create_ts = create_ts.ok_or(udb::FdbBindingError::CustomError( + format!("key should exist: {create_ts_key:?}").into(), + ))?; + let runner_kind = runner_kind.ok_or(udb::FdbBindingError::CustomError( + format!("key should exist: {runner_kind_key:?}").into(), + ))?; Ok(Some(Namespace { namespace_id, name, display_name, create_ts, + runner_kind, })) } diff --git a/packages/services/namespace/src/types.rs b/packages/services/namespace/src/types.rs index c7e6f71d59..05e924ad34 100644 --- a/packages/services/namespace/src/types.rs +++ b/packages/services/namespace/src/types.rs @@ -7,4 +7,58 @@ pub struct Namespace { pub name: String, pub display_name: String, pub create_ts: i64, + pub runner_kind: RunnerKind, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash, ToSchema)] +#[serde(rename_all = "snake_case")] +pub enum RunnerKind { + Outbound { + url: String, + slots_per_runner: u32, + min_runners: u32, + max_runners: u32, + runners_margin: u32, + }, + Custom, +} + +impl From for rivet_data::generated::namespace_runner_kind_v1::Data { + fn from(value: RunnerKind) -> Self { + match value { + RunnerKind::Outbound { + url, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } => rivet_data::generated::namespace_runner_kind_v1::Data::Outbound( + rivet_data::generated::namespace_runner_kind_v1::Outbound { + url, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + }, + ), + RunnerKind::Custom => rivet_data::generated::namespace_runner_kind_v1::Data::Custom, + } + } +} + +impl From for RunnerKind { + fn from(value: rivet_data::generated::namespace_runner_kind_v1::Data) -> Self { + match value { + rivet_data::generated::namespace_runner_kind_v1::Data::Outbound(o) => { + RunnerKind::Outbound { + url: o.url, + slots_per_runner: o.slots_per_runner, + min_runners: o.min_runners, + max_runners: o.max_runners, + runners_margin: o.runners_margin, + } + } + rivet_data::generated::namespace_runner_kind_v1::Data::Custom => RunnerKind::Custom, + } + } } diff --git a/packages/services/namespace/src/workflows/namespace.rs b/packages/services/namespace/src/workflows/namespace.rs index 16575347fe..90078b23e6 100644 --- a/packages/services/namespace/src/workflows/namespace.rs +++ b/packages/services/namespace/src/workflows/namespace.rs @@ -1,10 +1,9 @@ use futures_util::FutureExt; use gas::prelude::*; use serde::{Deserialize, Serialize}; -use udb_util::{FormalKey, SERIALIZABLE}; -use universaldb as udb; +use udb_util::{SERIALIZABLE, TxnExt}; -use crate::{errors, keys}; +use crate::{errors, keys, types::RunnerKind}; #[derive(Debug, Deserialize, Serialize)] pub struct Input { @@ -59,7 +58,7 @@ pub async fn namespace(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { // Does nothing yet ctx.repeat(|ctx| { async move { - ctx.listen::().await?; + ctx.listen::().await?; Ok(Loop::<()>::Continue) } @@ -79,7 +78,7 @@ pub struct Failed { } #[signal("namespace_update")] -pub struct NamespaceUpdate {} +pub struct Update {} #[derive(Debug, Clone, Serialize, Deserialize, Hash)] pub struct ValidateInput { @@ -156,45 +155,29 @@ async fn insert_fdb( let display_name = input.display_name.clone(); async move { - let name_key = keys::NameKey::new(namespace_id); - let name_idx_key = keys::ByNameKey::new(name.clone()); - let display_name_key = keys::DisplayNameKey::new(namespace_id); - let create_ts_key = keys::CreateTsKey::new(namespace_id); + let txs = tx.subspace(keys::subspace()); - let name_idx_entry = tx - .get(&keys::subspace().pack(&name_idx_key), SERIALIZABLE) - .await?; + let name_idx_key = keys::ByNameKey::new(name.clone()); - if name_idx_entry.is_some() { + if txs.exists(&name_idx_key, SERIALIZABLE).await? { return Ok(Err(errors::Namespace::NameNotUnique)); } - tx.set( - &keys::subspace().pack(&name_key), - &name_key - .serialize(name) - .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?, - ); - tx.set( - &keys::subspace().pack(&display_name_key), - &display_name_key - .serialize(display_name) - .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?, - ); - tx.set( - &keys::subspace().pack(&create_ts_key), - &create_ts_key - .serialize(input.create_ts) - .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?, - ); + txs.write(&keys::NameKey::new(namespace_id), name)?; + txs.write(&keys::DisplayNameKey::new(namespace_id), display_name)?; + txs.write(&keys::CreateTsKey::new(namespace_id), input.create_ts)?; + txs.write(&keys::RunnerKindKey::new(namespace_id), RunnerKind::Custom)?; + + // RunnerKind::Outbound { + // url: "http://runner:5051/start".to_string(), + // slots_per_runner: 10, + // min_runners: 1, + // max_runners: 1, + // runners_margin: 0, + // } // Insert idx - tx.set( - &keys::subspace().pack(&name_idx_key), - &name_idx_key - .serialize(namespace_id) - .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?, - ); + txs.write(&name_idx_key, namespace_id)?; Ok(Ok(())) } diff --git a/packages/services/pegboard/Cargo.toml b/packages/services/pegboard/Cargo.toml index 02e85d00e6..945f36bdfa 100644 --- a/packages/services/pegboard/Cargo.toml +++ b/packages/services/pegboard/Cargo.toml @@ -16,7 +16,7 @@ rivet-api-client.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true rivet-error.workspace = true -rivet-key-data.workspace = true +rivet-data.workspace = true rivet-metrics.workspace = true rivet-runner-protocol.workspace = true rivet-types.workspace = true diff --git a/packages/services/pegboard/src/keys/datacenter.rs b/packages/services/pegboard/src/keys/datacenter.rs deleted file mode 100644 index bd89e1db22..0000000000 --- a/packages/services/pegboard/src/keys/datacenter.rs +++ /dev/null @@ -1,249 +0,0 @@ -use std::result::Result::Ok; - -use anyhow::*; -use gas::prelude::*; -use udb_util::prelude::*; -use versioned_data_util::OwnedVersionedData; - -#[derive(Debug)] -pub struct RunnerAllocIdxKey { - pub namespace_id: Id, - pub name: String, - pub version: u32, - pub remaining_millislots: u32, - pub last_ping_ts: i64, - pub runner_id: Id, -} - -impl RunnerAllocIdxKey { - pub fn new( - namespace_id: Id, - name: String, - version: u32, - remaining_millislots: u32, - last_ping_ts: i64, - runner_id: Id, - ) -> Self { - RunnerAllocIdxKey { - namespace_id, - name, - version, - remaining_millislots, - last_ping_ts, - runner_id, - } - } - - pub fn subspace(namespace_id: Id, name: String) -> RunnerAllocIdxSubspaceKey { - RunnerAllocIdxSubspaceKey::new(namespace_id, name) - } - - pub fn entire_subspace() -> RunnerAllocIdxSubspaceKey { - RunnerAllocIdxSubspaceKey::entire() - } -} - -impl FormalKey for RunnerAllocIdxKey { - type Value = rivet_key_data::converted::RunnerAllocIdxKeyData; - - fn deserialize(&self, raw: &[u8]) -> Result { - rivet_key_data::versioned::RunnerAllocIdxKeyData::deserialize_with_embedded_version(raw)? - .try_into() - } - - fn serialize(&self, value: Self::Value) -> Result> { - rivet_key_data::versioned::RunnerAllocIdxKeyData::latest(value.try_into()?) - .serialize_with_embedded_version( - rivet_key_data::PEGBOARD_DATACENTER_RUNNER_ALLOC_IDX_VERSION, - ) - } -} - -impl TuplePack for RunnerAllocIdxKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = ( - DATACENTER, - RUNNER_ALLOC_IDX, - self.namespace_id, - &self.name, - // Stored in reverse order (higher versions are first) - -(self.version as i32), - // Stored in reverse order (higher remaining slots are first) - -(self.remaining_millislots as i32), - self.last_ping_ts, - self.runner_id, - ); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for RunnerAllocIdxKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let ( - input, - (_, _, namespace_id, name, version, remaining_millislots, last_ping_ts, runner_id), - ) = <(usize, usize, Id, String, i32, i32, i64, Id)>::unpack(input, tuple_depth)?; - - let v = RunnerAllocIdxKey { - namespace_id, - name, - version: -version as u32, - remaining_millislots: -remaining_millislots as u32, - last_ping_ts, - runner_id, - }; - - Ok((input, v)) - } -} - -pub struct RunnerAllocIdxSubspaceKey { - pub namespace_id: Option, - pub name: Option, -} - -impl RunnerAllocIdxSubspaceKey { - pub fn new(namespace_id: Id, name: String) -> Self { - RunnerAllocIdxSubspaceKey { - namespace_id: Some(namespace_id), - name: Some(name), - } - } - - pub fn entire() -> Self { - RunnerAllocIdxSubspaceKey { - namespace_id: None, - name: None, - } - } -} - -impl TuplePack for RunnerAllocIdxSubspaceKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let mut offset = VersionstampOffset::None { size: 0 }; - - let t = (DATACENTER, RUNNER_ALLOC_IDX); - offset += t.pack(w, tuple_depth)?; - - if let Some(namespace_id) = &self.namespace_id { - offset += namespace_id.pack(w, tuple_depth)?; - - if let Some(name) = &self.name { - offset += name.pack(w, tuple_depth)?; - } - } - - Ok(offset) - } -} - -#[derive(Debug)] -pub struct PendingActorByRunnerNameSelectorKey { - pub namespace_id: Id, - pub runner_name_selector: String, - pub ts: i64, - pub actor_id: Id, -} - -impl PendingActorByRunnerNameSelectorKey { - pub fn new(namespace_id: Id, runner_name_selector: String, ts: i64, actor_id: Id) -> Self { - PendingActorByRunnerNameSelectorKey { - namespace_id, - runner_name_selector, - ts, - actor_id, - } - } - - pub fn subspace( - namespace_id: Id, - runner_name_selector: String, - ) -> PendingActorByRunnerNameSelectorSubspaceKey { - PendingActorByRunnerNameSelectorSubspaceKey::new(namespace_id, runner_name_selector) - } -} - -impl FormalKey for PendingActorByRunnerNameSelectorKey { - /// Generation. - type Value = u32; - - fn deserialize(&self, raw: &[u8]) -> Result { - Ok(u32::from_be_bytes(raw.try_into()?)) - } - - fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.to_be_bytes().to_vec()) - } -} - -impl TuplePack for PendingActorByRunnerNameSelectorKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = ( - DATACENTER, - PENDING_ACTOR_BY_RUNNER_NAME_SELECTOR, - self.namespace_id, - &self.runner_name_selector, - self.ts, - self.actor_id, - ); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for PendingActorByRunnerNameSelectorKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, _, namespace_id, runner_name_selector, ts, actor_id)) = - <(usize, usize, Id, String, i64, Id)>::unpack(input, tuple_depth)?; - - let v = PendingActorByRunnerNameSelectorKey { - namespace_id, - runner_name_selector, - ts, - actor_id, - }; - - Ok((input, v)) - } -} - -pub struct PendingActorByRunnerNameSelectorSubspaceKey { - pub namespace_id: Id, - pub runner_name_selector: String, -} - -impl PendingActorByRunnerNameSelectorSubspaceKey { - pub fn new(namespace_id: Id, runner_name_selector: String) -> Self { - PendingActorByRunnerNameSelectorSubspaceKey { - namespace_id, - runner_name_selector, - } - } -} - -impl TuplePack for PendingActorByRunnerNameSelectorSubspaceKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = ( - DATACENTER, - PENDING_ACTOR_BY_RUNNER_NAME_SELECTOR, - self.namespace_id, - &self.runner_name_selector, - ); - t.pack(w, tuple_depth) - } -} diff --git a/packages/services/pegboard/src/keys/mod.rs b/packages/services/pegboard/src/keys/mod.rs index 9e93b8983a..402214f8a0 100644 --- a/packages/services/pegboard/src/keys/mod.rs +++ b/packages/services/pegboard/src/keys/mod.rs @@ -1,7 +1,6 @@ use udb_util::prelude::*; pub mod actor; -pub mod datacenter; pub mod epoxy; pub mod ns; pub mod runner; diff --git a/packages/services/pegboard/src/keys/ns.rs b/packages/services/pegboard/src/keys/ns.rs index dd1aea42a9..236c61bc9c 100644 --- a/packages/services/pegboard/src/keys/ns.rs +++ b/packages/services/pegboard/src/keys/ns.rs @@ -5,6 +5,249 @@ use gas::prelude::*; use udb_util::prelude::*; use versioned_data_util::OwnedVersionedData; +#[derive(Debug)] +pub struct RunnerAllocIdxKey { + pub namespace_id: Id, + pub name: String, + pub version: u32, + pub remaining_millislots: u32, + pub last_ping_ts: i64, + pub runner_id: Id, +} + +impl RunnerAllocIdxKey { + pub fn new( + namespace_id: Id, + name: String, + version: u32, + remaining_millislots: u32, + last_ping_ts: i64, + runner_id: Id, + ) -> Self { + RunnerAllocIdxKey { + namespace_id, + name, + version, + remaining_millislots, + last_ping_ts, + runner_id, + } + } + + pub fn subspace(namespace_id: Id, name: String) -> RunnerAllocIdxSubspaceKey { + RunnerAllocIdxSubspaceKey::new(namespace_id, name) + } + + pub fn entire_subspace() -> RunnerAllocIdxSubspaceKey { + RunnerAllocIdxSubspaceKey::entire() + } +} + +impl FormalKey for RunnerAllocIdxKey { + type Value = rivet_data::converted::RunnerAllocIdxKeyData; + + fn deserialize(&self, raw: &[u8]) -> Result { + rivet_data::versioned::RunnerAllocIdxKeyData::deserialize_with_embedded_version(raw)? + .try_into() + } + + fn serialize(&self, value: Self::Value) -> Result> { + rivet_data::versioned::RunnerAllocIdxKeyData::latest(value.try_into()?) + .serialize_with_embedded_version( + rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, + ) + } +} + +impl TuplePack for RunnerAllocIdxKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + NAMESPACE, + RUNNER_ALLOC_IDX, + self.namespace_id, + &self.name, + // Stored in reverse order (higher versions are first) + -(self.version as i32), + // Stored in reverse order (higher remaining slots are first) + -(self.remaining_millislots as i32), + self.last_ping_ts, + self.runner_id, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for RunnerAllocIdxKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let ( + input, + (_, _, namespace_id, name, version, remaining_millislots, last_ping_ts, runner_id), + ) = <(usize, usize, Id, String, i32, i32, i64, Id)>::unpack(input, tuple_depth)?; + + let v = RunnerAllocIdxKey { + namespace_id, + name, + version: -version as u32, + remaining_millislots: -remaining_millislots as u32, + last_ping_ts, + runner_id, + }; + + Ok((input, v)) + } +} + +pub struct RunnerAllocIdxSubspaceKey { + pub namespace_id: Option, + pub name: Option, +} + +impl RunnerAllocIdxSubspaceKey { + pub fn new(namespace_id: Id, name: String) -> Self { + RunnerAllocIdxSubspaceKey { + namespace_id: Some(namespace_id), + name: Some(name), + } + } + + pub fn entire() -> Self { + RunnerAllocIdxSubspaceKey { + namespace_id: None, + name: None, + } + } +} + +impl TuplePack for RunnerAllocIdxSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (NAMESPACE, RUNNER_ALLOC_IDX); + offset += t.pack(w, tuple_depth)?; + + if let Some(namespace_id) = &self.namespace_id { + offset += namespace_id.pack(w, tuple_depth)?; + + if let Some(name) = &self.name { + offset += name.pack(w, tuple_depth)?; + } + } + + Ok(offset) + } +} + +#[derive(Debug)] +pub struct PendingActorByRunnerNameSelectorKey { + pub namespace_id: Id, + pub runner_name_selector: String, + pub ts: i64, + pub actor_id: Id, +} + +impl PendingActorByRunnerNameSelectorKey { + pub fn new(namespace_id: Id, runner_name_selector: String, ts: i64, actor_id: Id) -> Self { + PendingActorByRunnerNameSelectorKey { + namespace_id, + runner_name_selector, + ts, + actor_id, + } + } + + pub fn subspace( + namespace_id: Id, + runner_name_selector: String, + ) -> PendingActorByRunnerNameSelectorSubspaceKey { + PendingActorByRunnerNameSelectorSubspaceKey::new(namespace_id, runner_name_selector) + } +} + +impl FormalKey for PendingActorByRunnerNameSelectorKey { + /// Generation. + type Value = u32; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(u32::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for PendingActorByRunnerNameSelectorKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + NAMESPACE, + PENDING_ACTOR_BY_RUNNER_NAME_SELECTOR, + self.namespace_id, + &self.runner_name_selector, + self.ts, + self.actor_id, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for PendingActorByRunnerNameSelectorKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, namespace_id, runner_name_selector, ts, actor_id)) = + <(usize, usize, Id, String, i64, Id)>::unpack(input, tuple_depth)?; + + let v = PendingActorByRunnerNameSelectorKey { + namespace_id, + runner_name_selector, + ts, + actor_id, + }; + + Ok((input, v)) + } +} + +pub struct PendingActorByRunnerNameSelectorSubspaceKey { + pub namespace_id: Id, + pub runner_name_selector: String, +} + +impl PendingActorByRunnerNameSelectorSubspaceKey { + pub fn new(namespace_id: Id, runner_name_selector: String) -> Self { + PendingActorByRunnerNameSelectorSubspaceKey { + namespace_id, + runner_name_selector, + } + } +} + +impl TuplePack for PendingActorByRunnerNameSelectorSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + NAMESPACE, + PENDING_ACTOR_BY_RUNNER_NAME_SELECTOR, + self.namespace_id, + &self.runner_name_selector, + ); + t.pack(w, tuple_depth) + } +} + #[derive(Debug)] pub struct ActiveActorKey { namespace_id: Id, @@ -320,18 +563,15 @@ impl ActorByKeyKey { } impl FormalKey for ActorByKeyKey { - type Value = rivet_key_data::converted::ActorByKeyKeyData; + type Value = rivet_data::converted::ActorByKeyKeyData; fn deserialize(&self, raw: &[u8]) -> Result { - rivet_key_data::versioned::ActorByKeyKeyData::deserialize_with_embedded_version(raw)? - .try_into() + rivet_data::versioned::ActorByKeyKeyData::deserialize_with_embedded_version(raw)?.try_into() } fn serialize(&self, value: Self::Value) -> Result> { - rivet_key_data::versioned::ActorByKeyKeyData::latest(value.try_into()?) - .serialize_with_embedded_version( - rivet_key_data::PEGBOARD_NAMESPACE_ACTOR_BY_KEY_VERSION, - ) + rivet_data::versioned::ActorByKeyKeyData::latest(value.try_into()?) + .serialize_with_embedded_version(rivet_data::PEGBOARD_NAMESPACE_ACTOR_BY_KEY_VERSION) } } @@ -938,18 +1178,16 @@ impl RunnerByKeyKey { } impl FormalKey for RunnerByKeyKey { - type Value = rivet_key_data::converted::RunnerByKeyKeyData; + type Value = rivet_data::converted::RunnerByKeyKeyData; fn deserialize(&self, raw: &[u8]) -> Result { - rivet_key_data::versioned::RunnerByKeyKeyData::deserialize_with_embedded_version(raw)? + rivet_data::versioned::RunnerByKeyKeyData::deserialize_with_embedded_version(raw)? .try_into() } fn serialize(&self, value: Self::Value) -> Result> { - rivet_key_data::versioned::RunnerByKeyKeyData::latest(value.try_into()?) - .serialize_with_embedded_version( - rivet_key_data::PEGBOARD_NAMESPACE_RUNNER_BY_KEY_VERSION, - ) + rivet_data::versioned::RunnerByKeyKeyData::latest(value.try_into()?) + .serialize_with_embedded_version(rivet_data::PEGBOARD_NAMESPACE_RUNNER_BY_KEY_VERSION) } } @@ -1002,16 +1240,15 @@ impl ActorNameKey { } impl FormalKey for ActorNameKey { - type Value = rivet_key_data::converted::ActorNameKeyData; + type Value = rivet_data::converted::ActorNameKeyData; fn deserialize(&self, raw: &[u8]) -> Result { - rivet_key_data::versioned::ActorNameKeyData::deserialize_with_embedded_version(raw)? - .try_into() + rivet_data::versioned::ActorNameKeyData::deserialize_with_embedded_version(raw)?.try_into() } fn serialize(&self, value: Self::Value) -> Result> { - rivet_key_data::versioned::ActorNameKeyData::latest(value.try_into()?) - .serialize_with_embedded_version(rivet_key_data::PEGBOARD_NAMESPACE_ACTOR_NAME_VERSION) + rivet_data::versioned::ActorNameKeyData::latest(value.try_into()?) + .serialize_with_embedded_version(rivet_data::PEGBOARD_NAMESPACE_ACTOR_NAME_VERSION) } } @@ -1128,3 +1365,87 @@ impl TuplePack for RunnerNameSubspaceKey { t.pack(w, tuple_depth) } } + +#[derive(Debug)] +pub struct OutboundDesiredSlotsKey { + pub namespace_id: Id, + pub runner_name_selector: String, +} + +impl OutboundDesiredSlotsKey { + pub fn new(namespace_id: Id, runner_name_selector: String) -> Self { + OutboundDesiredSlotsKey { + namespace_id, + runner_name_selector, + } + } + + pub fn subspace() -> OutboundDesiredSlotsSubspaceKey { + OutboundDesiredSlotsSubspaceKey::new() + } +} + +impl FormalKey for OutboundDesiredSlotsKey { + /// Count. + type Value = u32; + + fn deserialize(&self, raw: &[u8]) -> Result { + // NOTE: Atomic ops use little endian + Ok(u32::from_le_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + // NOTE: Atomic ops use little endian + Ok(value.to_le_bytes().to_vec()) + } +} + +impl TuplePack for OutboundDesiredSlotsKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + NAMESPACE, + OUTBOUND, + DESIRED_SLOTS, + self.namespace_id, + &self.runner_name_selector, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for OutboundDesiredSlotsKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, namespace_id, runner_name_selector)) = + <(usize, usize, Id, String)>::unpack(input, tuple_depth)?; + + let v = OutboundDesiredSlotsKey { + namespace_id, + runner_name_selector, + }; + + Ok((input, v)) + } +} + +pub struct OutboundDesiredSlotsSubspaceKey {} + +impl OutboundDesiredSlotsSubspaceKey { + pub fn new() -> Self { + OutboundDesiredSlotsSubspaceKey {} + } +} + +impl TuplePack for OutboundDesiredSlotsSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (NAMESPACE, OUTBOUND, DESIRED_SLOTS); + t.pack(w, tuple_depth) + } +} diff --git a/packages/services/pegboard/src/keys/runner.rs b/packages/services/pegboard/src/keys/runner.rs index 9e83aac404..528ba2f6b4 100644 --- a/packages/services/pegboard/src/keys/runner.rs +++ b/packages/services/pegboard/src/keys/runner.rs @@ -524,15 +524,15 @@ impl AddressKey { } impl FormalKey for AddressKey { - type Value = ::Latest; + type Value = ::Latest; fn deserialize(&self, raw: &[u8]) -> Result { - rivet_key_data::versioned::AddressKeyData::deserialize_with_embedded_version(raw) + rivet_data::versioned::AddressKeyData::deserialize_with_embedded_version(raw) } fn serialize(&self, value: Self::Value) -> Result> { - rivet_key_data::versioned::AddressKeyData::latest(value) - .serialize_with_embedded_version(rivet_key_data::PEGBOARD_RUNNER_ADDRESS_VERSION) + rivet_data::versioned::AddressKeyData::latest(value) + .serialize_with_embedded_version(rivet_data::PEGBOARD_RUNNER_ADDRESS_VERSION) } } @@ -816,7 +816,7 @@ impl MetadataKey { impl FormalChunkedKey for MetadataKey { type ChunkKey = MetadataChunkKey; - type Value = rivet_key_data::converted::MetadataKeyData; + type Value = rivet_data::converted::MetadataKeyData; fn chunk(&self, chunk: usize) -> Self::ChunkKey { MetadataChunkKey { @@ -826,7 +826,7 @@ impl FormalChunkedKey for MetadataKey { } fn combine(&self, chunks: Vec) -> Result { - rivet_key_data::versioned::MetadataKeyData::deserialize_with_embedded_version( + rivet_data::versioned::MetadataKeyData::deserialize_with_embedded_version( &chunks .iter() .map(|x| x.value().iter().map(|x| *x)) @@ -838,8 +838,8 @@ impl FormalChunkedKey for MetadataKey { fn split(&self, value: Self::Value) -> Result>> { Ok( - rivet_key_data::versioned::MetadataKeyData::latest(value.try_into()?) - .serialize_with_embedded_version(rivet_key_data::PEGBOARD_RUNNER_METADATA_VERSION)? + rivet_data::versioned::MetadataKeyData::latest(value.try_into()?) + .serialize_with_embedded_version(rivet_data::PEGBOARD_RUNNER_METADATA_VERSION)? .chunks(udb_util::CHUNK_SIZE) .map(|x| x.to_vec()) .collect(), diff --git a/packages/services/pegboard/src/lib.rs b/packages/services/pegboard/src/lib.rs index 8a08a5b9a9..b5dd33dd0a 100644 --- a/packages/services/pegboard/src/lib.rs +++ b/packages/services/pegboard/src/lib.rs @@ -2,6 +2,7 @@ use gas::prelude::*; pub mod errors; pub mod keys; +pub mod messages; mod metrics; pub mod ops; pub mod pubsub_subjects; diff --git a/packages/services/pegboard/src/messages.rs b/packages/services/pegboard/src/messages.rs new file mode 100644 index 0000000000..e3ad78680d --- /dev/null +++ b/packages/services/pegboard/src/messages.rs @@ -0,0 +1,4 @@ +use gas::prelude::*; + +#[message("pegboard_bump_outbound_autoscaler")] +pub struct BumpOutboundAutoscaler {} diff --git a/packages/services/pegboard/src/ops/actor/create.rs b/packages/services/pegboard/src/ops/actor/create.rs index 5dd72c7c75..66a7a1b42c 100644 --- a/packages/services/pegboard/src/ops/actor/create.rs +++ b/packages/services/pegboard/src/ops/actor/create.rs @@ -125,8 +125,12 @@ async fn forward_to_datacenter( // Get namespace name for the remote call let namespace = ctx - .op(namespace::ops::get_global::Input { namespace_id }) + .op(namespace::ops::get_global::Input { + namespace_ids: vec![namespace_id], + }) .await? + .into_iter() + .next() .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; // Generate a new actor ID with the correct datacenter label diff --git a/packages/services/pegboard/src/ops/actor/get_for_key.rs b/packages/services/pegboard/src/ops/actor/get_for_key.rs index d6960e5689..f850f88c69 100644 --- a/packages/services/pegboard/src/ops/actor/get_for_key.rs +++ b/packages/services/pegboard/src/ops/actor/get_for_key.rs @@ -60,9 +60,11 @@ pub async fn pegboard_actor_get_for_key(ctx: &OperationCtx, input: &Input) -> Re // Get namespace name for the remote call let namespace = ctx .op(namespace::ops::get_global::Input { - namespace_id: input.namespace_id, + namespace_ids: vec![input.namespace_id], }) .await? + .into_iter() + .next() .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; // Make request to remote datacenter diff --git a/packages/services/pegboard/src/ops/actor/list_names.rs b/packages/services/pegboard/src/ops/actor/list_names.rs index 727a0bed6e..8fe72d2100 100644 --- a/packages/services/pegboard/src/ops/actor/list_names.rs +++ b/packages/services/pegboard/src/ops/actor/list_names.rs @@ -1,6 +1,6 @@ use futures_util::{StreamExt, TryStreamExt}; use gas::prelude::*; -use rivet_key_data::converted::ActorNameKeyData; +use rivet_data::converted::ActorNameKeyData; use udb_util::{SNAPSHOT, TxnExt}; use universaldb::{self as udb, options::StreamingMode}; diff --git a/packages/services/pegboard/src/ops/runner/get.rs b/packages/services/pegboard/src/ops/runner/get.rs index d967eccee3..22cbbcc6e4 100644 --- a/packages/services/pegboard/src/ops/runner/get.rs +++ b/packages/services/pegboard/src/ops/runner/get.rs @@ -1,7 +1,7 @@ use anyhow::Result; use futures_util::TryStreamExt; use gas::prelude::*; -use rivet_key_data::generated::pegboard_runner_address_v1::Data as AddressKeyData; +use rivet_data::generated::pegboard_runner_address_v1::Data as AddressKeyData; use rivet_types::runners::Runner; use udb_util::{FormalChunkedKey, SERIALIZABLE, SNAPSHOT, TxnExt}; use universaldb::{self as udb, options::StreamingMode}; diff --git a/packages/services/pegboard/src/ops/runner/update_alloc_idx.rs b/packages/services/pegboard/src/ops/runner/update_alloc_idx.rs index 7b94ce7f8f..39519590df 100644 --- a/packages/services/pegboard/src/ops/runner/update_alloc_idx.rs +++ b/packages/services/pegboard/src/ops/runner/update_alloc_idx.rs @@ -121,7 +121,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input) let remaining_millislots = (remaining_slots * 1000) / total_slots; - let old_alloc_key = keys::datacenter::RunnerAllocIdxKey::new( + let old_alloc_key = keys::ns::RunnerAllocIdxKey::new( namespace_id, name.clone(), version, @@ -140,7 +140,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input) Action::AddIdx => { txs.write( &old_alloc_key, - rivet_key_data::converted::RunnerAllocIdxKeyData { + rivet_data::converted::RunnerAllocIdxKeyData { workflow_id, remaining_slots, total_slots, @@ -162,7 +162,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input) txs.delete(&old_alloc_key); txs.write( - &keys::datacenter::RunnerAllocIdxKey::new( + &keys::ns::RunnerAllocIdxKey::new( namespace_id, name.clone(), version, @@ -170,7 +170,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input) last_ping_ts, runner.runner_id, ), - rivet_key_data::converted::RunnerAllocIdxKeyData { + rivet_data::converted::RunnerAllocIdxKeyData { workflow_id, remaining_slots, total_slots, diff --git a/packages/services/pegboard/src/workflows/actor/actor_keys.rs b/packages/services/pegboard/src/workflows/actor/actor_keys.rs index 1ff028160a..e5cc89a10c 100644 --- a/packages/services/pegboard/src/workflows/actor/actor_keys.rs +++ b/packages/services/pegboard/src/workflows/actor/actor_keys.rs @@ -4,7 +4,7 @@ use epoxy::{ }; use futures_util::TryStreamExt; use gas::prelude::*; -use rivet_key_data::converted::ActorByKeyKeyData; +use rivet_data::converted::ActorByKeyKeyData; use udb_util::prelude::*; use universaldb::{self as udb, FdbBindingError, options::StreamingMode}; diff --git a/packages/services/pegboard/src/workflows/actor/destroy.rs b/packages/services/pegboard/src/workflows/actor/destroy.rs index c408ef6500..44862d219d 100644 --- a/packages/services/pegboard/src/workflows/actor/destroy.rs +++ b/packages/services/pegboard/src/workflows/actor/destroy.rs @@ -1,8 +1,9 @@ use gas::prelude::*; -use rivet_key_data::converted::ActorByKeyKeyData; +use namespace::types::RunnerKind; +use rivet_data::converted::ActorByKeyKeyData; use rivet_runner_protocol::protocol; use udb_util::{SERIALIZABLE, TxnExt}; -use universaldb as udb; +use universaldb::{self as udb, options::MutationType}; use super::{DestroyComplete, DestroyStarted, State}; @@ -85,6 +86,7 @@ async fn update_state_and_fdb( state.namespace_id, &state.runner_name_selector, runner_id, + &state.ns_runner_kind, &tx, ) .await?; @@ -162,6 +164,7 @@ pub(crate) async fn clear_slot( namespace_id: Id, runner_name_selector: &str, runner_id: Id, + ns_runner_kind: &RunnerKind, tx: &udb::RetryableTransaction, ) -> Result<(), udb::FdbBindingError> { let txs = tx.subspace(keys::subspace()); @@ -198,7 +201,7 @@ pub(crate) async fn clear_slot( // Write new remaining slots txs.write(&runner_remaining_slots_key, new_runner_remaining_slots)?; - let old_runner_alloc_key = keys::datacenter::RunnerAllocIdxKey::new( + let old_runner_alloc_key = keys::ns::RunnerAllocIdxKey::new( namespace_id, runner_name_selector.to_string(), runner_version, @@ -213,7 +216,7 @@ pub(crate) async fn clear_slot( txs.delete(&old_runner_alloc_key); let new_remaining_millislots = (new_runner_remaining_slots * 1000) / runner_total_slots; - let new_runner_alloc_key = keys::datacenter::RunnerAllocIdxKey::new( + let new_runner_alloc_key = keys::ns::RunnerAllocIdxKey::new( namespace_id, runner_name_selector.to_string(), runner_version, @@ -224,7 +227,7 @@ pub(crate) async fn clear_slot( txs.write( &new_runner_alloc_key, - rivet_key_data::converted::RunnerAllocIdxKeyData { + rivet_data::converted::RunnerAllocIdxKeyData { workflow_id: runner_workflow_id, remaining_slots: new_runner_remaining_slots, total_slots: runner_total_slots, @@ -232,6 +235,14 @@ pub(crate) async fn clear_slot( )?; } + if let RunnerKind::Outbound { .. } = ns_runner_kind { + txs.atomic_op( + &keys::ns::OutboundDesiredSlotsKey::new(namespace_id, runner_name_selector.to_string()), + &(-1i32).to_le_bytes(), + MutationType::Add, + ); + } + Ok(()) } diff --git a/packages/services/pegboard/src/workflows/actor/mod.rs b/packages/services/pegboard/src/workflows/actor/mod.rs index a512312465..3bff9c38fb 100644 --- a/packages/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/services/pegboard/src/workflows/actor/mod.rs @@ -1,5 +1,6 @@ use futures_util::FutureExt; use gas::prelude::*; +use namespace::types::RunnerKind; use rivet_runner_protocol::protocol; use rivet_types::actors::CrashPolicy; @@ -45,6 +46,9 @@ pub struct State { pub create_ts: i64, pub create_complete_ts: Option, + + pub ns_runner_kind: RunnerKind, + pub start_ts: Option, // NOTE: This is not the alarm ts, this is when the actor started sleeping. See `LifecycleState` for alarm pub sleep_ts: Option, @@ -66,6 +70,7 @@ impl State { runner_name_selector: String, crash_policy: CrashPolicy, create_ts: i64, + ns_runner_kind: RunnerKind, ) -> Self { State { name, @@ -78,6 +83,8 @@ impl State { create_ts, create_complete_ts: None, + ns_runner_kind, + start_ts: None, pending_allocation_ts: None, sleep_ts: None, @@ -115,15 +122,18 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - if let Err(error) = validation_res { - ctx.msg(Failed { error }) - .tag("actor_id", input.actor_id) - .send() - .await?; + let metadata = match validation_res { + Ok(metadata) => metadata, + Err(error) => { + ctx.msg(Failed { error }) + .tag("actor_id", input.actor_id) + .send() + .await?; - // TODO(RVT-3928): return Ok(Err); - return Ok(()); - } + // TODO(RVT-3928): return Ok(Err); + return Ok(()); + } + }; ctx.activity(setup::InitStateAndUdbInput { actor_id: input.actor_id, @@ -133,6 +143,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> runner_name_selector: input.runner_name_selector.clone(), crash_policy: input.crash_policy, create_ts: ctx.create_ts(), + ns_runner_kind: metadata.ns_runner_kind, }) .await?; @@ -156,6 +167,19 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .tag("actor_id", input.actor_id) .send() .await?; + + // Destroyed early + ctx.workflow(destroy::Input { + namespace_id: input.namespace_id, + actor_id: input.actor_id, + name: input.name.clone(), + key: input.key.clone(), + generation: 0, + kill: false, + }) + .output() + .await?; + return Ok(()); } actor_keys::ReserveKeyOutput::KeyExists { existing_actor_id } => { @@ -170,8 +194,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .await?; // Destroyed early - // - // This will also deallocate any key that was already allocated to Epoxy ctx.workflow(destroy::Input { namespace_id: input.namespace_id, actor_id: input.actor_id, @@ -335,7 +357,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> state.alarm_ts = None; state.sleeping = false; - if runtime::reschedule_actor(ctx, &input, state, true).await? { + if runtime::reschedule_actor(ctx, &input, state).await? { // Destroyed early return Ok(Loop::Break(runtime::LifecycleRes { generation: state.generation, @@ -466,7 +488,7 @@ async fn handle_stopped( .await?; } - if runtime::reschedule_actor(ctx, &input, state, false).await? { + if runtime::reschedule_actor(ctx, &input, state).await? { // Destroyed early return Ok(Some(runtime::LifecycleRes { generation: state.generation, diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index 6a6a93a14e..b1cb7887e8 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -3,18 +3,16 @@ use std::time::Instant; use futures_util::StreamExt; use futures_util::{FutureExt, TryStreamExt}; use gas::prelude::*; +use namespace::types::RunnerKind; use rivet_metrics::KeyValue; use rivet_runner_protocol::protocol; use udb_util::{FormalKey, SERIALIZABLE, SNAPSHOT, TxnExt}; use universaldb::{ self as udb, - options::{ConflictRangeType, StreamingMode}, + options::{ConflictRangeType, MutationType, StreamingMode}, }; -use crate::{ - keys, metrics, - workflows::runner::{AllocatePendingActorsInput, RUNNER_ELIGIBLE_THRESHOLD_MS}, -}; +use crate::{keys, metrics, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS}; use super::{ ACTOR_START_THRESHOLD_MS, Allocate, BASE_RETRY_TIMEOUT_MS, Destroy, Input, PendingAllocation, @@ -105,6 +103,7 @@ async fn allocate_actor( let start_instant = Instant::now(); let mut state = ctx.state::()?; let namespace_id = state.namespace_id; + let ns_runner_kind = &state.ns_runner_kind; // NOTE: This txn should closely resemble the one found in the allocate_pending_actors activity of the // client wf @@ -114,13 +113,24 @@ async fn allocate_actor( let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS; let txs = tx.subspace(keys::subspace()); + // Increment desired slots if namespace has an outbound runner kind + if let RunnerKind::Outbound { .. } = ns_runner_kind { + txs.atomic_op( + &keys::ns::OutboundDesiredSlotsKey::new( + namespace_id, + input.runner_name_selector.clone(), + ), + &1u32.to_le_bytes(), + MutationType::Add, + ); + } + // Check if a queue exists - let pending_actor_subspace = txs.subspace( - &keys::datacenter::PendingActorByRunnerNameSelectorKey::subspace( + let pending_actor_subspace = + txs.subspace(&keys::ns::PendingActorByRunnerNameSelectorKey::subspace( namespace_id, input.runner_name_selector.clone(), - ), - ); + )); let queue_exists = txs .get_ranges_keyvalues( udb::RangeOption { @@ -137,11 +147,10 @@ async fn allocate_actor( .is_some(); if !queue_exists { - let runner_alloc_subspace = - txs.subspace(&keys::datacenter::RunnerAllocIdxKey::subspace( - namespace_id, - input.runner_name_selector.clone(), - )); + let runner_alloc_subspace = txs.subspace(&keys::ns::RunnerAllocIdxKey::subspace( + namespace_id, + input.runner_name_selector.clone(), + )); let mut stream = txs.get_ranges_keyvalues( udb::RangeOption { @@ -161,7 +170,7 @@ async fn allocate_actor( }; let (old_runner_alloc_key, old_runner_alloc_key_data) = - txs.read_entry::(&entry)?; + txs.read_entry::(&entry)?; if let Some(highest_version) = highest_version { // We have passed all of the runners with the highest version. This is reachable if @@ -196,7 +205,7 @@ async fn allocate_actor( // Write new allocation key with 1 less slot txs.write( - &keys::datacenter::RunnerAllocIdxKey::new( + &keys::ns::RunnerAllocIdxKey::new( namespace_id, input.runner_name_selector.clone(), old_runner_alloc_key.version, @@ -204,7 +213,7 @@ async fn allocate_actor( old_runner_alloc_key.last_ping_ts, old_runner_alloc_key.runner_id, ), - rivet_key_data::converted::RunnerAllocIdxKeyData { + rivet_data::converted::RunnerAllocIdxKeyData { workflow_id: old_runner_alloc_key_data.workflow_id, remaining_slots: new_remaining_slots, total_slots: old_runner_alloc_key_data.total_slots, @@ -250,7 +259,7 @@ async fn allocate_actor( // want. If a runner reads from the queue while this is being inserted, one of the two txns will // retry and we ensure the actor does not end up in queue limbo. txs.write( - &keys::datacenter::PendingActorByRunnerNameSelectorKey::new( + &keys::ns::PendingActorByRunnerNameSelectorKey::new( namespace_id, input.runner_name_selector.clone(), pending_ts, @@ -299,7 +308,7 @@ pub async fn set_not_connectable(ctx: &ActivityCtx, input: &SetNotConnectableInp Ok(()) }) - .custom_instrument(tracing::info_span!("actor_deallocate_tx")) + .custom_instrument(tracing::info_span!("actor_set_not_connectable_tx")) .await?; state.connectable_ts = None; @@ -318,11 +327,13 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() let runner_name_selector = &state.runner_name_selector; let namespace_id = state.namespace_id; let runner_id = state.runner_id; + let ns_runner_kind = &state.ns_runner_kind; ctx.udb()? .run(|tx, _mc| async move { - let connectable_key = keys::actor::ConnectableKey::new(input.actor_id); - tx.clear(&keys::subspace().pack(&connectable_key)); + let txs = tx.subspace(keys::subspace()); + + txs.delete(&keys::actor::ConnectableKey::new(input.actor_id)); if let Some(runner_id) = runner_id { destroy::clear_slot( @@ -330,9 +341,19 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() namespace_id, runner_name_selector, runner_id, + ns_runner_kind, &tx, ) .await?; + } else if let RunnerKind::Outbound { .. } = ns_runner_kind { + txs.atomic_op( + &keys::ns::OutboundDesiredSlotsKey::new( + namespace_id, + runner_name_selector.clone(), + ), + &(-1i32).to_le_bytes(), + MutationType::Add, + ); } Ok(()) @@ -370,6 +391,10 @@ pub async fn spawn_actor( "failed to allocate (no availability), waiting for allocation", ); + ctx.msg(crate::messages::BumpOutboundAutoscaler {}) + .send() + .await?; + // If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for // an `Allocate` signal match ctx.listen::().await? { @@ -441,35 +466,9 @@ pub async fn reschedule_actor( ctx: &mut WorkflowCtx, input: &Input, state: &mut LifecycleState, - sleeping: bool, ) -> Result { tracing::debug!(actor_id=?input.actor_id, "rescheduling actor"); - // There shouldn't be an allocation if the actor is sleeping - if !sleeping { - ctx.activity(DeallocateInput { - actor_id: input.actor_id, - }) - .await?; - - // Allocate other pending actors from queue - let res = ctx - .activity(AllocatePendingActorsInput { - namespace_id: input.namespace_id, - name: input.runner_name_selector.clone(), - }) - .await?; - - // Dispatch pending allocs - for alloc in res.allocations { - ctx.signal(alloc.signal) - .to_workflow::() - .tag("actor_id", alloc.actor_id) - .send() - .await?; - } - } - let next_generation = state.generation + 1; // Waits for the actor to be ready (or destroyed) and automatically retries if failed to allocate. @@ -563,7 +562,7 @@ pub async fn clear_pending_allocation( .udb()? .run(|tx, _mc| async move { let pending_alloc_key = - keys::subspace().pack(&keys::datacenter::PendingActorByRunnerNameSelectorKey::new( + keys::subspace().pack(&keys::ns::PendingActorByRunnerNameSelectorKey::new( input.namespace_id, input.runner_name_selector.clone(), input.pending_allocation_ts, diff --git a/packages/services/pegboard/src/workflows/actor/setup.rs b/packages/services/pegboard/src/workflows/actor/setup.rs index 9d55ff596f..421b7228fc 100644 --- a/packages/services/pegboard/src/workflows/actor/setup.rs +++ b/packages/services/pegboard/src/workflows/actor/setup.rs @@ -1,5 +1,6 @@ use gas::prelude::*; -use rivet_key_data::converted::ActorNameKeyData; +use namespace::types::RunnerKind; +use rivet_data::converted::ActorNameKeyData; use rivet_types::actors::CrashPolicy; use udb_util::{SERIALIZABLE, TxnExt}; @@ -17,20 +18,25 @@ pub struct ValidateInput { pub input: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ValidateOutput { + pub ns_runner_kind: RunnerKind, +} + #[activity(Validate)] pub async fn validate( ctx: &ActivityCtx, input: &ValidateInput, -) -> Result> { +) -> Result> { let ns_res = ctx .op(namespace::ops::get_global::Input { - namespace_id: input.namespace_id, + namespace_ids: vec![input.namespace_id], }) .await?; - if ns_res.is_none() { + let Some(ns) = ns_res.into_iter().next() else { return Ok(Err(errors::Actor::NamespaceNotFound)); - } + }; if input .input @@ -55,7 +61,9 @@ pub async fn validate( } } - Ok(Ok(())) + Ok(Ok(ValidateOutput { + ns_runner_kind: ns.runner_kind, + })) } #[derive(Debug, Clone, Serialize, Deserialize, Hash)] @@ -67,6 +75,7 @@ pub struct InitStateAndUdbInput { pub runner_name_selector: String, pub crash_policy: CrashPolicy, pub create_ts: i64, + pub ns_runner_kind: RunnerKind, } #[activity(InitStateAndFdb)] @@ -80,6 +89,7 @@ pub async fn insert_state_and_fdb(ctx: &ActivityCtx, input: &InitStateAndUdbInpu input.runner_name_selector.clone(), input.crash_policy, input.create_ts, + input.ns_runner_kind.clone(), )); ctx.udb()? diff --git a/packages/services/pegboard/src/workflows/runner.rs b/packages/services/pegboard/src/workflows/runner.rs index ff7b5a64cc..e5d64f17d6 100644 --- a/packages/services/pegboard/src/workflows/runner.rs +++ b/packages/services/pegboard/src/workflows/runner.rs @@ -1,6 +1,6 @@ use futures_util::{FutureExt, StreamExt, TryStreamExt}; use gas::prelude::*; -use rivet_key_data::{ +use rivet_data::{ converted::{ActorNameKeyData, MetadataKeyData, RunnerByKeyKeyData}, generated::pegboard_runner_address_v1::Data as AddressKeyData, }; @@ -639,7 +639,7 @@ async fn insert_fdb(ctx: &ActivityCtx, input: &InsertFdbInput) -> Result<()> { // Insert into index (same as the `update_alloc_idx` op with `AddIdx`) txs.write( - &keys::datacenter::RunnerAllocIdxKey::new( + &keys::ns::RunnerAllocIdxKey::new( input.namespace_id, input.name.clone(), input.version, @@ -647,7 +647,7 @@ async fn insert_fdb(ctx: &ActivityCtx, input: &InsertFdbInput) -> Result<()> { last_ping_ts, input.runner_id, ), - rivet_key_data::converted::RunnerAllocIdxKeyData { + rivet_data::converted::RunnerAllocIdxKeyData { workflow_id: ctx.workflow_id(), remaining_slots, total_slots: input.total_slots, @@ -998,12 +998,11 @@ pub(crate) async fn allocate_pending_actors( let txs = tx.subspace(keys::subspace()); let mut results = Vec::new(); - let pending_actor_subspace = txs.subspace( - &keys::datacenter::PendingActorByRunnerNameSelectorKey::subspace( + let pending_actor_subspace = + txs.subspace(&keys::ns::PendingActorByRunnerNameSelectorKey::subspace( input.namespace_id, input.name.clone(), - ), - ); + )); let mut queue_stream = txs.get_ranges_keyvalues( udb::RangeOption { mode: StreamingMode::Iterator, @@ -1021,15 +1020,12 @@ pub(crate) async fn allocate_pending_actors( }; let (queue_key, generation) = - txs.read_entry::( - &queue_entry, - )?; + txs.read_entry::(&queue_entry)?; - let runner_alloc_subspace = - txs.subspace(&keys::datacenter::RunnerAllocIdxKey::subspace( - input.namespace_id, - input.name.clone(), - )); + let runner_alloc_subspace = txs.subspace(&keys::ns::RunnerAllocIdxKey::subspace( + input.namespace_id, + input.name.clone(), + )); let mut stream = txs.get_ranges_keyvalues( udb::RangeOption { @@ -1051,7 +1047,7 @@ pub(crate) async fn allocate_pending_actors( }; let (old_runner_alloc_key, old_runner_alloc_key_data) = - txs.read_entry::(&entry)?; + txs.read_entry::(&entry)?; if let Some(highest_version) = highest_version { // We have passed all of the runners with the highest version. This is reachable if @@ -1088,7 +1084,7 @@ pub(crate) async fn allocate_pending_actors( // Write new allocation key with 1 less slot txs.write( - &keys::datacenter::RunnerAllocIdxKey::new( + &keys::ns::RunnerAllocIdxKey::new( input.namespace_id, input.name.clone(), old_runner_alloc_key.version, @@ -1096,7 +1092,7 @@ pub(crate) async fn allocate_pending_actors( old_runner_alloc_key.last_ping_ts, old_runner_alloc_key.runner_id, ), - rivet_key_data::converted::RunnerAllocIdxKeyData { + rivet_data::converted::RunnerAllocIdxKeyData { workflow_id: old_runner_alloc_key_data.workflow_id, remaining_slots: new_remaining_slots, total_slots: old_runner_alloc_key_data.total_slots, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 91185e953a..d2358c0f78 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -741,6 +741,9 @@ importers: '@rivetkit/engine-runner-protocol': specifier: workspace:* version: link:../runner-protocol + hono: + specifier: ^4.0.0 + version: 4.8.12 ws: specifier: ^8.18.3 version: 8.18.3 diff --git a/sdks/rust/key-data/Cargo.toml b/sdks/rust/data/Cargo.toml similarity index 95% rename from sdks/rust/key-data/Cargo.toml rename to sdks/rust/data/Cargo.toml index 339d60c362..3282e73918 100644 --- a/sdks/rust/key-data/Cargo.toml +++ b/sdks/rust/data/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "rivet-key-data" +name = "rivet-data" version.workspace = true authors.workspace = true license.workspace = true diff --git a/sdks/rust/key-data/build.rs b/sdks/rust/data/build.rs similarity index 99% rename from sdks/rust/key-data/build.rs rename to sdks/rust/data/build.rs index 898e7ea645..e6b18ab845 100644 --- a/sdks/rust/key-data/build.rs +++ b/sdks/rust/data/build.rs @@ -61,7 +61,7 @@ fn main() { .and_then(|p| p.parent()) .expect("Failed to find workspace root"); - let schema_dir = workspace_root.join("sdks").join("schemas").join("key-data"); + let schema_dir = workspace_root.join("sdks").join("schemas").join("data"); println!("cargo:rerun-if-changed={}", schema_dir.display()); diff --git a/sdks/rust/key-data/src/converted.rs b/sdks/rust/data/src/converted.rs similarity index 94% rename from sdks/rust/key-data/src/converted.rs rename to sdks/rust/data/src/converted.rs index 44e954bc84..859682e546 100644 --- a/sdks/rust/key-data/src/converted.rs +++ b/sdks/rust/data/src/converted.rs @@ -9,10 +9,10 @@ pub struct RunnerAllocIdxKeyData { pub total_slots: u32, } -impl TryFrom for RunnerAllocIdxKeyData { +impl TryFrom for RunnerAllocIdxKeyData { type Error = anyhow::Error; - fn try_from(value: pegboard_datacenter_runner_alloc_idx_v1::Data) -> Result { + fn try_from(value: pegboard_namespace_runner_alloc_idx_v1::Data) -> Result { Ok(RunnerAllocIdxKeyData { workflow_id: Id::from_slice(&value.workflow_id)?, remaining_slots: value.remaining_slots, @@ -21,11 +21,11 @@ impl TryFrom for RunnerAllocIdxKe } } -impl TryFrom for pegboard_datacenter_runner_alloc_idx_v1::Data { +impl TryFrom for pegboard_namespace_runner_alloc_idx_v1::Data { type Error = anyhow::Error; fn try_from(value: RunnerAllocIdxKeyData) -> Result { - Ok(pegboard_datacenter_runner_alloc_idx_v1::Data { + Ok(pegboard_namespace_runner_alloc_idx_v1::Data { workflow_id: value.workflow_id.as_bytes(), remaining_slots: value.remaining_slots, total_slots: value.total_slots, diff --git a/sdks/rust/key-data/src/generated.rs b/sdks/rust/data/src/generated.rs similarity index 100% rename from sdks/rust/key-data/src/generated.rs rename to sdks/rust/data/src/generated.rs diff --git a/sdks/rust/key-data/src/lib.rs b/sdks/rust/data/src/lib.rs similarity index 84% rename from sdks/rust/key-data/src/lib.rs rename to sdks/rust/data/src/lib.rs index 3292e22e83..a93ceaea1d 100644 --- a/sdks/rust/key-data/src/lib.rs +++ b/sdks/rust/data/src/lib.rs @@ -2,9 +2,9 @@ pub mod converted; pub mod generated; pub mod versioned; -pub const PEGBOARD_DATACENTER_RUNNER_ALLOC_IDX_VERSION: u16 = 1; pub const PEGBOARD_RUNNER_ADDRESS_VERSION: u16 = 1; pub const PEGBOARD_RUNNER_METADATA_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_ACTOR_BY_KEY_VERSION: u16 = 1; +pub const PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_RUNNER_BY_KEY_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_ACTOR_NAME_VERSION: u16 = 1; diff --git a/sdks/rust/key-data/src/versioned.rs b/sdks/rust/data/src/versioned.rs similarity index 83% rename from sdks/rust/key-data/src/versioned.rs rename to sdks/rust/data/src/versioned.rs index af709553f3..002363d361 100644 --- a/sdks/rust/key-data/src/versioned.rs +++ b/sdks/rust/data/src/versioned.rs @@ -4,13 +4,13 @@ use versioned_data_util::OwnedVersionedData; use crate::generated::*; pub enum RunnerAllocIdxKeyData { - V1(pegboard_datacenter_runner_alloc_idx_v1::Data), + V1(pegboard_namespace_runner_alloc_idx_v1::Data), } impl OwnedVersionedData for RunnerAllocIdxKeyData { - type Latest = pegboard_datacenter_runner_alloc_idx_v1::Data; + type Latest = pegboard_namespace_runner_alloc_idx_v1::Data; - fn latest(latest: pegboard_datacenter_runner_alloc_idx_v1::Data) -> Self { + fn latest(latest: pegboard_namespace_runner_alloc_idx_v1::Data) -> Self { RunnerAllocIdxKeyData::V1(latest) } @@ -206,3 +206,37 @@ impl OwnedVersionedData for ActorNameKeyData { } } } + +pub enum NamespaceRunnerKind { + V1(namespace_runner_kind_v1::Data), +} + +impl OwnedVersionedData for NamespaceRunnerKind { + type Latest = namespace_runner_kind_v1::Data; + + fn latest(latest: namespace_runner_kind_v1::Data) -> Self { + NamespaceRunnerKind::V1(latest) + } + + fn into_latest(self) -> Result { + #[allow(irrefutable_let_patterns)] + if let NamespaceRunnerKind::V1(data) = self { + Ok(data) + } else { + bail!("version not latest"); + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 1 => Ok(NamespaceRunnerKind::V1(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + NamespaceRunnerKind::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } +} diff --git a/sdks/schemas/data/namespace.runner_kind.v1.bare b/sdks/schemas/data/namespace.runner_kind.v1.bare new file mode 100644 index 0000000000..1cc263b54e --- /dev/null +++ b/sdks/schemas/data/namespace.runner_kind.v1.bare @@ -0,0 +1,14 @@ +type Outbound struct { + url: str + slots_per_runner: u32 + min_runners: u32 + max_runners: u32 + runners_margin: u32 +} + +type Custom void + +type Data union { + Outbound | + Custom +} diff --git a/sdks/schemas/key-data/pegboard.namespace.actor_by_key.v1.bare b/sdks/schemas/data/pegboard.namespace.actor_by_key.v1.bare similarity index 100% rename from sdks/schemas/key-data/pegboard.namespace.actor_by_key.v1.bare rename to sdks/schemas/data/pegboard.namespace.actor_by_key.v1.bare diff --git a/sdks/schemas/key-data/pegboard.namespace.actor_name.v1.bare b/sdks/schemas/data/pegboard.namespace.actor_name.v1.bare similarity index 100% rename from sdks/schemas/key-data/pegboard.namespace.actor_name.v1.bare rename to sdks/schemas/data/pegboard.namespace.actor_name.v1.bare diff --git a/sdks/schemas/key-data/pegboard.datacenter.runner_alloc_idx.v1.bare b/sdks/schemas/data/pegboard.namespace.runner_alloc_idx.v1.bare similarity index 100% rename from sdks/schemas/key-data/pegboard.datacenter.runner_alloc_idx.v1.bare rename to sdks/schemas/data/pegboard.namespace.runner_alloc_idx.v1.bare diff --git a/sdks/schemas/key-data/pegboard.namespace.runner_by_key.v1.bare b/sdks/schemas/data/pegboard.namespace.runner_by_key.v1.bare similarity index 100% rename from sdks/schemas/key-data/pegboard.namespace.runner_by_key.v1.bare rename to sdks/schemas/data/pegboard.namespace.runner_by_key.v1.bare diff --git a/sdks/schemas/key-data/pegboard.runner.address.v1.bare b/sdks/schemas/data/pegboard.runner.address.v1.bare similarity index 100% rename from sdks/schemas/key-data/pegboard.runner.address.v1.bare rename to sdks/schemas/data/pegboard.runner.address.v1.bare diff --git a/sdks/schemas/key-data/pegboard.runner.metadata.v1.bare b/sdks/schemas/data/pegboard.runner.metadata.v1.bare similarity index 100% rename from sdks/schemas/key-data/pegboard.runner.metadata.v1.bare rename to sdks/schemas/data/pegboard.runner.metadata.v1.bare diff --git a/sdks/typescript/runner/src/mod.ts b/sdks/typescript/runner/src/mod.ts index 2623d63848..611f2bbcf0 100644 --- a/sdks/typescript/runner/src/mod.ts +++ b/sdks/typescript/runner/src/mod.ts @@ -35,6 +35,7 @@ export interface RunnerConfig { metadata?: Record; onConnected: () => void; onDisconnected: () => void; + onShutdown: () => void; fetch: (actorId: string, request: Request) => Promise; websocket?: (actorId: string, ws: any, request: Request) => Promise; onActorStart: ( @@ -362,9 +363,9 @@ export class Runner { //console.log("Tunnel shutdown completed"); } - if (exit) { - process.exit(0); - } + if (exit) process.exit(0); + + this.#config.onShutdown(); } // MARK: Networking diff --git a/sdks/typescript/test-runner/package.json b/sdks/typescript/test-runner/package.json index 4e75c1e4ae..8c637f818a 100644 --- a/sdks/typescript/test-runner/package.json +++ b/sdks/typescript/test-runner/package.json @@ -11,6 +11,7 @@ "@rivetkit/engine-runner": "workspace:*", "@hono/node-server": "^1.18.2", "@rivetkit/engine-runner-protocol": "workspace:*", + "hono": "^4.0.0", "ws": "^8.18.3" }, "devDependencies": { @@ -22,4 +23,4 @@ "typescript": "^5.3.3", "vitest": "^1.6.0" } -} +} \ No newline at end of file diff --git a/sdks/typescript/test-runner/src/main.ts b/sdks/typescript/test-runner/src/main.ts index 596dda5009..fbe681326d 100644 --- a/sdks/typescript/test-runner/src/main.ts +++ b/sdks/typescript/test-runner/src/main.ts @@ -2,6 +2,8 @@ import { Runner } from "@rivetkit/engine-runner"; import type { RunnerConfig, ActorConfig } from "@rivetkit/engine-runner"; import WebSocket from "ws"; import { serve } from "@hono/node-server"; +import { streamSSE } from "hono/streaming"; +import { Hono } from 'hono' const INTERNAL_SERVER_PORT = process.env.INTERNAL_SERVER_PORT ? Number(process.env.INTERNAL_SERVER_PORT) @@ -16,120 +18,150 @@ const RIVET_RUNNER_TOTAL_SLOTS = process.env.RIVET_RUNNER_TOTAL_SLOTS ? Number(process.env.RIVET_RUNNER_TOTAL_SLOTS) : 100; const RIVET_ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://localhost:6420"; +const AUTOSTART = process.env.NO_AUTOSTART == undefined; let runnerStarted = Promise.withResolvers(); +let runnerStopped = Promise.withResolvers(); let websocketOpen = Promise.withResolvers(); let websocketClosed = Promise.withResolvers(); let runner: Runner | null = null; const actorWebSockets = new Map(); -// Start internal server -serve({ - fetch: async (request: Request) => { - const url = new URL(request.url); - if (url.pathname == "/wait-ready") { - await runnerStarted.promise; - return new Response(JSON.stringify(runner?.runnerId), { - status: 200, - }); - } else if (url.pathname == "/has-actor") { - let actorIdQuery = url.searchParams.get("actor"); - let generationQuery = url.searchParams.get("generation"); - let generation = generationQuery - ? Number(generationQuery) - : undefined; - - if (!actorIdQuery || !runner?.hasActor(actorIdQuery, generation)) { - return new Response(undefined, { status: 404 }); - } - } else if (url.pathname == "/shutdown") { - await runner?.shutdown(true); - } +// Create internal server +const app = new Hono(); + +app.get('/wait-ready', async (c) => { + await runnerStarted.promise; + return c.json(runner?.runnerId); +}); + +app.get('/has-actor', async (c) => { + let actorIdQuery = c.req.query('actor'); + let generationQuery = c.req.query('generation'); + let generation = generationQuery ? Number(generationQuery) : undefined; + + if (!actorIdQuery || !runner?.hasActor(actorIdQuery, generation)) { + return c.text('', 404); + } + return c.text('ok'); +}); + +app.get('/shutdown', async (c) => { + await runner?.shutdown(true); + return c.text('ok'); +}); + +app.get('/start', async (c) => { + return streamSSE(c, async (stream) => { + if (runner == null) runner = await startRunner(); + + stream.writeSSE({ data: runner.runnerId! }); + + await runnerStopped.promise; + }); +}); - return new Response("ok", { status: 200 }); - }, +app.get('*', (c) => c.text('ok')); + +serve({ + fetch: app.fetch, port: INTERNAL_SERVER_PORT, }); console.log(`Internal HTTP server listening on port ${INTERNAL_SERVER_PORT}`); -// Use objects to hold the current promise resolvers so callbacks always get the latest -const startedRef = { current: Promise.withResolvers() }; -const stoppedRef = { current: Promise.withResolvers() }; - -const config: RunnerConfig = { - version: RIVET_RUNNER_VERSION, - endpoint: RIVET_ENDPOINT, - namespace: RIVET_NAMESPACE, - runnerName: "test-runner", - runnerKey: RIVET_RUNNER_KEY, - totalSlots: RIVET_RUNNER_TOTAL_SLOTS, - prepopulateActorNames: {}, - onConnected: () => { - runnerStarted.resolve(undefined); - }, - onDisconnected: () => {}, - fetch: async (actorId: string, request: Request) => { - console.log( - `[TEST-RUNNER] Fetch called for actor ${actorId}, URL: ${request.url}`, - ); - const url = new URL(request.url); - if (url.pathname === "/ping") { - // Return the actor ID in response - const responseData = { - actorId, - status: "ok", - timestamp: Date.now(), - }; - console.log(`[TEST-RUNNER] Returning ping response:`, responseData); - return new Response(JSON.stringify(responseData), { - status: 200, - headers: { "Content-Type": "application/json" }, +if (AUTOSTART) runner = await startRunner(); + +async function startRunner(): Promise { + const config: RunnerConfig = { + version: RIVET_RUNNER_VERSION, + endpoint: RIVET_ENDPOINT, + namespace: RIVET_NAMESPACE, + runnerName: "test-runner", + runnerKey: RIVET_RUNNER_KEY, + totalSlots: RIVET_RUNNER_TOTAL_SLOTS, + prepopulateActorNames: {}, + onConnected: () => { + runnerStarted.resolve(undefined); + }, + onDisconnected: () => { }, + onShutdown: () => { + runnerStopped.resolve(undefined); + }, + fetch: async (actorId: string, request: Request) => { + console.log(`[TEST-RUNNER] Fetch called for actor ${actorId}, URL: ${request.url}`); + const url = new URL(request.url); + if (url.pathname === "/ping") { + // Return the actor ID in response + const responseData = { + actorId, + status: "ok", + timestamp: Date.now(), + }; + console.log(`[TEST-RUNNER] Returning ping response:`, responseData); + return new Response( + JSON.stringify(responseData), + { + status: 200, + headers: { "Content-Type": "application/json" }, + }, + ); + } + + return new Response("ok", { status: 200 }); + }, + onActorStart: async ( + _actorId: string, + _generation: number, + _config: ActorConfig, + ) => { + console.log( + `Actor ${_actorId} started (generation ${_generation})`, + ); + }, + onActorStop: async (_actorId: string, _generation: number) => { + console.log( + `Actor ${_actorId} stopped (generation ${_generation})`, + ); + }, + websocket: async ( + actorId: string, + ws: WebSocket, + request: Request, + ) => { + console.log(`WebSocket connected for actor ${actorId}`); + websocketOpen.resolve(undefined); + actorWebSockets.set(actorId, ws); + + // Echo server - send back any messages received + ws.addEventListener("message", (event) => { + const data = event.data; + console.log( + `WebSocket message from actor ${actorId}:`, + data, + ); + ws.send(`Echo: ${data}`); + }); + + ws.addEventListener("close", () => { + console.log(`WebSocket closed for actor ${actorId}`); + actorWebSockets.delete(actorId); + websocketClosed.resolve(undefined); + }); + + ws.addEventListener("error", (error) => { + console.error(`WebSocket error for actor ${actorId}:`, error); }); - } - - return new Response("ok", { status: 200 }); - }, - onActorStart: async ( - _actorId: string, - _generation: number, - _config: ActorConfig, - ) => { - console.log(`Actor ${_actorId} started (generation ${_generation})`); - startedRef.current.resolve(undefined); - }, - onActorStop: async (_actorId: string, _generation: number) => { - console.log(`Actor ${_actorId} stopped (generation ${_generation})`); - stoppedRef.current.resolve(undefined); - }, - websocket: async (actorId: string, ws: WebSocket, request: Request) => { - console.log(`WebSocket connected for actor ${actorId}`); - websocketOpen.resolve(undefined); - actorWebSockets.set(actorId, ws); - - // Echo server - send back any messages received - ws.addEventListener("message", (event) => { - const data = event.data; - console.log(`WebSocket message from actor ${actorId}:`, data); - ws.send(`Echo: ${data}`); - }); - - ws.addEventListener("close", () => { - console.log(`WebSocket closed for actor ${actorId}`); - actorWebSockets.delete(actorId); - websocketClosed.resolve(undefined); - }); - - ws.addEventListener("error", (error) => { - console.error(`WebSocket error for actor ${actorId}:`, error); - }); - }, -}; - -runner = new Runner(config); - -// Start runner -await runner.start(); - -// Wait for runner to be ready -console.log("Waiting runner start..."); -await runnerStarted.promise; + }, + }; + + runner = new Runner(config); + + // Start runner + await runner.start(); + + // Wait for runner to be ready + console.log("Waiting runner start..."); + await runnerStarted.promise; + + return runner; +} \ No newline at end of file