From c2c14ae1f1f863b7b506303143e99d7beedefe01 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 8 Sep 2025 17:42:39 -0700 Subject: [PATCH] fix: configure runner config per runner name --- Cargo.lock | 38 +-- Cargo.toml | 9 +- out/errors/runner_config.invalid.json | 5 + out/errors/runner_config.not_found.json | 5 + out/openapi.json | 220 +++++++++++++--- packages/common/pools/src/reqwest.rs | 3 +- .../common/types/src/keys/pegboard/mod.rs | 6 + packages/common/types/src/keys/pegboard/ns.rs | 48 ++-- packages/common/types/src/msgs/pegboard.rs | 4 +- packages/common/udb-util/src/ext.rs | 10 +- packages/common/udb-util/src/keys.rs | 4 +- packages/core/api-peer/src/internal.rs | 17 +- .../src/{namespaces.rs => namespaces/mod.rs} | 63 +---- .../api-peer/src/namespaces/runner_configs.rs | 192 ++++++++++++++ packages/core/api-peer/src/router.rs | 22 +- .../core/api-public/src/actors/list_names.rs | 34 ++- .../src/{namespaces.rs => namespaces/mod.rs} | 53 +--- .../src/namespaces/runner_configs.rs | 214 ++++++++++++++++ packages/core/api-public/src/router.rs | 21 +- packages/core/guard/server/src/cache/actor.rs | 1 - .../Cargo.toml | 2 +- .../src/lib.rs | 112 ++++---- packages/infra/engine/Cargo.toml | 2 +- packages/infra/engine/src/run_config.rs | 4 +- packages/services/internal/Cargo.toml | 1 + .../ops/bump_serverless_autoscaler_global.rs | 60 +++++ packages/services/internal/src/ops/mod.rs | 1 + packages/services/namespace/Cargo.toml | 1 + packages/services/namespace/src/errors.rs | 10 + packages/services/namespace/src/keys.rs | 239 +++++++++++++++--- .../services/namespace/src/ops/get_global.rs | 16 +- .../services/namespace/src/ops/get_local.rs | 17 +- packages/services/namespace/src/ops/mod.rs | 1 + .../namespace/src/ops/runner_config/delete.rs | 55 ++++ .../src/ops/runner_config/get_global.rs | 96 +++++++ .../src/ops/runner_config/get_local.rs | 65 +++++ .../namespace/src/ops/runner_config/list.rs | 94 +++++++ .../namespace/src/ops/runner_config/mod.rs | 5 + .../namespace/src/ops/runner_config/upsert.rs | 93 +++++++ packages/services/namespace/src/types.rs | 38 +-- .../namespace/src/workflows/namespace.rs | 137 +--------- packages/services/pegboard/src/keys/mod.rs | 2 +- .../pegboard/src/ops/actor/list_names.rs | 4 +- .../pegboard/src/workflows/actor/destroy.rs | 9 +- .../pegboard/src/workflows/actor/mod.rs | 26 +- .../pegboard/src/workflows/actor/runtime.rs | 46 ++-- .../pegboard/src/workflows/actor/setup.rs | 16 +- sdks/rust/data/src/versioned.rs | 18 +- ...1.bare => namespace.runner_config.v1.bare} | 7 +- 49 files changed, 1582 insertions(+), 564 deletions(-) create mode 100644 out/errors/runner_config.invalid.json create mode 100644 out/errors/runner_config.not_found.json rename packages/core/api-peer/src/{namespaces.rs => namespaces/mod.rs} (78%) create mode 100644 packages/core/api-peer/src/namespaces/runner_configs.rs rename packages/core/api-public/src/{namespaces.rs => namespaces/mod.rs} (73%) create mode 100644 packages/core/api-public/src/namespaces/runner_configs.rs rename packages/core/{pegboard-outbound => pegboard-serverless}/Cargo.toml (94%) rename packages/core/{pegboard-outbound => pegboard-serverless}/src/lib.rs (73%) create mode 100644 packages/services/internal/src/ops/bump_serverless_autoscaler_global.rs create mode 100644 packages/services/namespace/src/ops/runner_config/delete.rs create mode 100644 packages/services/namespace/src/ops/runner_config/get_global.rs create mode 100644 packages/services/namespace/src/ops/runner_config/get_local.rs create mode 100644 packages/services/namespace/src/ops/runner_config/list.rs create mode 100644 packages/services/namespace/src/ops/runner_config/mod.rs create mode 100644 packages/services/namespace/src/ops/runner_config/upsert.rs rename sdks/schemas/data/{namespace.runner_kind.v1.bare => namespace.runner_config.v1.bare} (69%) diff --git a/Cargo.lock b/Cargo.lock index 0758fb75e0..770f168860 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2385,6 +2385,7 @@ dependencies = [ "anyhow", "gasoline", "rivet-api-client", + "rivet-types", "serde", ] @@ -2789,6 +2790,7 @@ dependencies = [ "rivet-types", "rivet-util", "serde", + "strum", "tracing", "udb-util", "universaldb", @@ -3329,44 +3331,44 @@ dependencies = [ ] [[package]] -name = "pegboard-outbound" +name = "pegboard-runner-ws" version = "0.0.1" dependencies = [ "anyhow", - "epoxy", "gasoline", + "hyper 1.6.0", "namespace", "pegboard", - "reqwest-eventsource", + "pegboard-actor-kv", "rivet-config", + "rivet-error", + "rivet-metrics", "rivet-runner-protocol", - "rivet-types", + "rivet-runtime", + "serde", + "serde_json", + "tokio-tungstenite", "tracing", - "udb-util", - "universaldb", + "url", + "versioned-data-util", ] [[package]] -name = "pegboard-runner-ws" +name = "pegboard-serverless" version = "0.0.1" dependencies = [ "anyhow", + "epoxy", "gasoline", - "hyper 1.6.0", "namespace", "pegboard", - "pegboard-actor-kv", + "reqwest-eventsource", "rivet-config", - "rivet-error", - "rivet-metrics", "rivet-runner-protocol", - "rivet-runtime", - "serde", - "serde_json", - "tokio-tungstenite", + "rivet-types", "tracing", - "url", - "versioned-data-util", + "udb-util", + "universaldb", ] [[package]] @@ -4337,8 +4339,8 @@ dependencies = [ "lz4_flex", "namespace", "pegboard", - "pegboard-outbound", "pegboard-runner-ws", + "pegboard-serverless", "portpicker", "rand 0.8.5", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index aa4bd1df99..3018250c06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/udb-util","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-outbound","packages/core/pegboard-runner-ws","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/internal","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"] +members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/udb-util","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-runner-ws","packages/core/pegboard-serverless","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/internal","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"] [workspace.package] version = "25.6.1" @@ -280,6 +280,7 @@ path = "packages/common/error/core" [workspace.dependencies.rivet-error-macros] path = "packages/common/error/macros" + [workspace.dependencies.gas] package = "gasoline" path = "packages/common/gasoline/core" @@ -363,12 +364,12 @@ path = "packages/core/guard/server" [workspace.dependencies.pegboard-gateway] path = "packages/core/pegboard-gateway" -[workspace.dependencies.pegboard-outbound] -path = "packages/core/pegboard-outbound" - [workspace.dependencies.pegboard-runner-ws] path = "packages/core/pegboard-runner-ws" +[workspace.dependencies.pegboard-serverless] +path = "packages/core/pegboard-serverless" + [workspace.dependencies.pegboard-tunnel] path = "packages/core/pegboard-tunnel" diff --git a/out/errors/runner_config.invalid.json b/out/errors/runner_config.invalid.json new file mode 100644 index 0000000000..314730a13e --- /dev/null +++ b/out/errors/runner_config.invalid.json @@ -0,0 +1,5 @@ +{ + "code": "invalid", + "group": "runner_config", + "message": "Invalid runner config." +} \ No newline at end of file diff --git a/out/errors/runner_config.not_found.json b/out/errors/runner_config.not_found.json new file mode 100644 index 0000000000..a78fba06a6 --- /dev/null +++ b/out/errors/runner_config.not_found.json @@ -0,0 +1,5 @@ +{ + "code": "not_found", + "group": "runner_config", + "message": "No config for this runner exists." +} \ No newline at end of file diff --git a/out/openapi.json b/out/openapi.json index f4278895ed..8a3df3d7c5 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -548,12 +548,105 @@ } } } + } + }, + "/namespaces/{namespace_id}/runner-configs": { + "get": { + "tags": [ + "namespaces::runner_configs" + ], + "operationId": "namespaces_runner_configs_list", + "parameters": [ + { + "name": "namespace_id", + "in": "path", + "required": true, + "schema": { + "$ref": "#/components/schemas/RivetId" + } + }, + { + "name": "limit", + "in": "query", + "required": false, + "schema": { + "type": "integer", + "minimum": 0 + } + }, + { + "name": "cursor", + "in": "query", + "required": false, + "schema": { + "type": "string" + } + }, + { + "name": "variant", + "in": "query", + "required": false, + "schema": { + "$ref": "#/components/schemas/RunnerConfigVariant" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/NamespacesRunnerConfigsListResponse" + } + } + } + } + } + } + }, + "/namespaces/{namespace_id}/runner-configs/{runner_name}": { + "get": { + "tags": [ + "namespaces::runner_configs" + ], + "operationId": "namespaces_runner_configs_get", + "parameters": [ + { + "name": "namespace_id", + "in": "path", + "required": true, + "schema": { + "$ref": "#/components/schemas/RivetId" + } + }, + { + "name": "runner_name", + "in": "path", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/NamespacesRunnerConfigsGetResponse" + } + } + } + } + } }, "put": { "tags": [ - "namespaces" + "namespaces::runner_configs" ], - "operationId": "namespaces_update", + "operationId": "namespaces_runner_configs_upsert", "parameters": [ { "name": "namespace_id", @@ -562,13 +655,21 @@ "schema": { "$ref": "#/components/schemas/RivetId" } + }, + { + "name": "runner_name", + "in": "path", + "required": true, + "schema": { + "type": "string" + } } ], "requestBody": { "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/NamespacesUpdateRequest" + "$ref": "#/components/schemas/NamespacesRunnerConfigsUpsertRequest" } } }, @@ -580,7 +681,43 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/NamespacesUpdateResponse" + "$ref": "#/components/schemas/NamespacesRunnerConfigsUpsertResponse" + } + } + } + } + } + }, + "delete": { + "tags": [ + "namespaces::runner_configs" + ], + "operationId": "namespaces_runner_configs_delete", + "parameters": [ + { + "name": "namespace_id", + "in": "path", + "required": true, + "schema": { + "$ref": "#/components/schemas/RivetId" + } + }, + { + "name": "runner_name", + "in": "path", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/NamespacesRunnerConfigsDeleteResponse" } } } @@ -1091,8 +1228,7 @@ "namespace_id", "name", "display_name", - "create_ts", - "runner_kind" + "create_ts" ], "properties": { "create_ts": { @@ -1107,9 +1243,6 @@ }, "namespace_id": { "$ref": "#/components/schemas/RivetId" - }, - "runner_kind": { - "$ref": "#/components/schemas/NamespacesRunnerKind" } } }, @@ -1171,15 +1304,15 @@ }, "additionalProperties": false }, - "NamespacesRunnerKind": { + "NamespacesRunnerConfig": { "oneOf": [ { "type": "object", "required": [ - "outbound" + "serverless" ], "properties": { - "outbound": { + "serverless": { "type": "object", "required": [ "url", @@ -1222,42 +1355,49 @@ } } } - }, - { - "type": "string", - "enum": [ - "custom" - ] } ] }, - "NamespacesUpdate": { - "oneOf": [ - { + "NamespacesRunnerConfigsDeleteResponse": { + "type": "object" + }, + "NamespacesRunnerConfigsGetResponse": { + "type": "object", + "required": [ + "runner_config" + ], + "properties": { + "runner_config": { + "$ref": "#/components/schemas/NamespacesRunnerConfig" + } + } + }, + "NamespacesRunnerConfigsListResponse": { + "type": "object", + "required": [ + "runner_configs", + "pagination" + ], + "properties": { + "pagination": { + "$ref": "#/components/schemas/Pagination" + }, + "runner_configs": { "type": "object", - "required": [ - "update_runner_kind" - ], - "properties": { - "update_runner_kind": { - "type": "object", - "required": [ - "runner_kind" - ], - "properties": { - "runner_kind": { - "$ref": "#/components/schemas/NamespacesRunnerKind" - } - } - } + "additionalProperties": { + "$ref": "#/components/schemas/NamespacesRunnerConfig" + }, + "propertyNames": { + "type": "string" } } - ] + }, + "additionalProperties": false }, - "NamespacesUpdateRequest": { - "$ref": "#/components/schemas/NamespacesUpdate" + "NamespacesRunnerConfigsUpsertRequest": { + "$ref": "#/components/schemas/NamespacesRunnerConfig" }, - "NamespacesUpdateResponse": { + "NamespacesRunnerConfigsUpsertResponse": { "type": "object" }, "Pagination": { diff --git a/packages/common/pools/src/reqwest.rs b/packages/common/pools/src/reqwest.rs index 78f1f2e7cb..23476203dd 100644 --- a/packages/common/pools/src/reqwest.rs +++ b/packages/common/pools/src/reqwest.rs @@ -2,6 +2,7 @@ use reqwest::Client; use tokio::sync::OnceCell; static CLIENT: OnceCell = OnceCell::const_new(); +static CLIENT_NO_TIMEOUT: OnceCell = OnceCell::const_new(); pub async fn client() -> Result { CLIENT @@ -15,7 +16,7 @@ pub async fn client() -> Result { } pub async fn client_no_timeout() -> Result { - CLIENT + CLIENT_NO_TIMEOUT .get_or_try_init(|| async { Client::builder().build() }) .await .cloned() diff --git a/packages/common/types/src/keys/pegboard/mod.rs b/packages/common/types/src/keys/pegboard/mod.rs index 7e0a481030..1e3ff30358 100644 --- a/packages/common/types/src/keys/pegboard/mod.rs +++ b/packages/common/types/src/keys/pegboard/mod.rs @@ -1 +1,7 @@ +use udb_util::prelude::*; + pub mod ns; + +pub fn subspace() -> udb_util::Subspace { + udb_util::Subspace::new(&(RIVET, PEGBOARD)) +} diff --git a/packages/common/types/src/keys/pegboard/ns.rs b/packages/common/types/src/keys/pegboard/ns.rs index a768f14f61..fa04e89a6e 100644 --- a/packages/common/types/src/keys/pegboard/ns.rs +++ b/packages/common/types/src/keys/pegboard/ns.rs @@ -5,29 +5,29 @@ use gas::prelude::*; use udb_util::prelude::*; #[derive(Debug)] -pub struct OutboundDesiredSlotsKey { +pub struct ServerlessDesiredSlotsKey { pub namespace_id: Id, - pub runner_name_selector: String, + pub runner_name: String, } -impl OutboundDesiredSlotsKey { - pub fn new(namespace_id: Id, runner_name_selector: String) -> Self { - OutboundDesiredSlotsKey { +impl ServerlessDesiredSlotsKey { + pub fn new(namespace_id: Id, runner_name: String) -> Self { + ServerlessDesiredSlotsKey { namespace_id, - runner_name_selector, + runner_name, } } - pub fn subspace(namespace_id: Id) -> OutboundDesiredSlotsSubspaceKey { - OutboundDesiredSlotsSubspaceKey::new(namespace_id) + pub fn subspace(namespace_id: Id) -> ServerlessDesiredSlotsSubspaceKey { + ServerlessDesiredSlotsSubspaceKey::new(namespace_id) } - pub fn entire_subspace() -> OutboundDesiredSlotsSubspaceKey { - OutboundDesiredSlotsSubspaceKey::entire() + pub fn entire_subspace() -> ServerlessDesiredSlotsSubspaceKey { + ServerlessDesiredSlotsSubspaceKey::entire() } } -impl FormalKey for OutboundDesiredSlotsKey { +impl FormalKey for ServerlessDesiredSlotsKey { /// Count. type Value = u32; @@ -42,7 +42,7 @@ impl FormalKey for OutboundDesiredSlotsKey { } } -impl TuplePack for OutboundDesiredSlotsKey { +impl TuplePack for ServerlessDesiredSlotsKey { fn pack( &self, w: &mut W, @@ -50,46 +50,46 @@ impl TuplePack for OutboundDesiredSlotsKey { ) -> std::io::Result { let t = ( NAMESPACE, - OUTBOUND, + SERVERLESS, DESIRED_SLOTS, self.namespace_id, - &self.runner_name_selector, + &self.runner_name, ); t.pack(w, tuple_depth) } } -impl<'de> TupleUnpack<'de> for OutboundDesiredSlotsKey { +impl<'de> TupleUnpack<'de> for ServerlessDesiredSlotsKey { fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, _, _, namespace_id, runner_name_selector)) = + let (input, (_, _, _, namespace_id, runner_name)) = <(usize, usize, usize, Id, String)>::unpack(input, tuple_depth)?; - let v = OutboundDesiredSlotsKey { + let v = ServerlessDesiredSlotsKey { namespace_id, - runner_name_selector, + runner_name, }; Ok((input, v)) } } -pub struct OutboundDesiredSlotsSubspaceKey { +pub struct ServerlessDesiredSlotsSubspaceKey { namespace_id: Option, } -impl OutboundDesiredSlotsSubspaceKey { +impl ServerlessDesiredSlotsSubspaceKey { pub fn new(namespace_id: Id) -> Self { - OutboundDesiredSlotsSubspaceKey { + ServerlessDesiredSlotsSubspaceKey { namespace_id: Some(namespace_id), } } pub fn entire() -> Self { - OutboundDesiredSlotsSubspaceKey { namespace_id: None } + ServerlessDesiredSlotsSubspaceKey { namespace_id: None } } } -impl TuplePack for OutboundDesiredSlotsSubspaceKey { +impl TuplePack for ServerlessDesiredSlotsSubspaceKey { fn pack( &self, w: &mut W, @@ -97,7 +97,7 @@ impl TuplePack for OutboundDesiredSlotsSubspaceKey { ) -> std::io::Result { let mut offset = VersionstampOffset::None { size: 0 }; - let t = (NAMESPACE, OUTBOUND, DESIRED_SLOTS); + let t = (NAMESPACE, SERVERLESS, DESIRED_SLOTS); offset += t.pack(w, tuple_depth)?; if let Some(namespace_id) = self.namespace_id { diff --git a/packages/common/types/src/msgs/pegboard.rs b/packages/common/types/src/msgs/pegboard.rs index e3ad78680d..ece441706b 100644 --- a/packages/common/types/src/msgs/pegboard.rs +++ b/packages/common/types/src/msgs/pegboard.rs @@ -1,4 +1,4 @@ use gas::prelude::*; -#[message("pegboard_bump_outbound_autoscaler")] -pub struct BumpOutboundAutoscaler {} +#[message("pegboard_bump_serverless_autoscaler")] +pub struct BumpServerlessAutoscaler {} diff --git a/packages/common/udb-util/src/ext.rs b/packages/common/udb-util/src/ext.rs index 82ce1e159d..0d5bcc9c0f 100644 --- a/packages/common/udb-util/src/ext.rs +++ b/packages/common/udb-util/src/ext.rs @@ -44,7 +44,7 @@ impl<'a> TxnSubspace<'a> { ) -> Result { self.subspace .unpack(key) - .context("failed unpacking key") + .with_context(|| format!("failed unpacking key of {}", std::any::type_name::())) .map_err(|x| udb::FdbBindingError::CustomError(x.into())) } @@ -59,7 +59,7 @@ impl<'a> TxnSubspace<'a> { .with_context(|| { format!( "failed serializing key value of {}", - std::any::type_name::() + std::any::type_name::(), ) }) .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?, @@ -194,7 +194,7 @@ impl SliceExt for udb::future::FdbSlice { .with_context(|| { format!( "failed deserializing key value of {}", - std::any::type_name::() + std::any::type_name::(), ) }) .map_err(|x| udb::FdbBindingError::CustomError(x.into())) @@ -212,7 +212,7 @@ impl OptSliceExt for Option { .with_context(|| { format!( "failed deserializing key value of {}", - std::any::type_name::() + std::any::type_name::(), ) }) .map_err(|x| udb::FdbBindingError::CustomError(x.into())) @@ -228,7 +228,7 @@ impl OptSliceExt for Option { .with_context(|| { format!( "failed deserializing key value of {}", - std::any::type_name::() + std::any::type_name::(), ) }) .map_err(|x| udb::FdbBindingError::CustomError(x.into())) diff --git a/packages/common/udb-util/src/keys.rs b/packages/common/udb-util/src/keys.rs index f873ca7066..177a4eb685 100644 --- a/packages/common/udb-util/src/keys.rs +++ b/packages/common/udb-util/src/keys.rs @@ -119,7 +119,7 @@ define_keys! { (91, METRIC, "metric"), (92, CURRENT_BALLOT, "current_ballot"), (93, INSTANCE_BALLOT, "instance_ballot"), - (94, OUTBOUND, "outbound"), + (94, SERVERLESS, "serverless"), (95, DESIRED_SLOTS, "desired_slots"), - (96, RUNNER_KIND, "runner_kind"), + (96, BY_VARIANT, "by_variant"), } diff --git a/packages/core/api-peer/src/internal.rs b/packages/core/api-peer/src/internal.rs index 47c84cd57b..3aa04b075e 100644 --- a/packages/core/api-peer/src/internal.rs +++ b/packages/core/api-peer/src/internal.rs @@ -1,7 +1,6 @@ use anyhow::Result; use gas::prelude::*; use rivet_api_builder::ApiCtx; -use rivet_util::Id; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] @@ -27,3 +26,19 @@ pub async fn cache_purge( Ok(CachePurgeResponse {}) } + +#[derive(Serialize)] +pub struct BumpServerlessAutoscalerResponse {} + +pub async fn bump_serverless_autoscaler( + ctx: ApiCtx, + _path: (), + _query: (), + _body: (), +) -> Result { + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await?; + + Ok(BumpServerlessAutoscalerResponse {}) +} diff --git a/packages/core/api-peer/src/namespaces.rs b/packages/core/api-peer/src/namespaces/mod.rs similarity index 78% rename from packages/core/api-peer/src/namespaces.rs rename to packages/core/api-peer/src/namespaces/mod.rs index 6fae8668ae..c0c8dcffd3 100644 --- a/packages/core/api-peer/src/namespaces.rs +++ b/packages/core/api-peer/src/namespaces/mod.rs @@ -6,6 +6,8 @@ use rivet_util::Id; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; +pub mod runner_configs; + #[derive(Debug, Serialize, Deserialize, IntoParams)] #[serde(deny_unknown_fields)] #[into_params(parameter_in = Query)] @@ -29,7 +31,6 @@ pub async fn get(ctx: ApiCtx, path: GetPath, _query: GetQuery) -> Result Result { - let mut sub = ctx - .subscribe::(( - "namespace_id", - path.namespace_id, - )) - .await?; - - let res = ctx - .signal(body.0) - .to_workflow::() - .tag("namespace_id", path.namespace_id) - .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = res - .as_ref() - .err() - .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) - { - return Err(namespace::errors::Namespace::NotFound.build()); - } else { - res?; - } - - sub.next() - .await? - .into_body() - .res - .map_err(|err| err.build())?; - - Ok(UpdateResponse {}) -} diff --git a/packages/core/api-peer/src/namespaces/runner_configs.rs b/packages/core/api-peer/src/namespaces/runner_configs.rs new file mode 100644 index 0000000000..39dec5a601 --- /dev/null +++ b/packages/core/api-peer/src/namespaces/runner_configs.rs @@ -0,0 +1,192 @@ +use std::collections::HashMap; + +use anyhow::Result; +use rivet_api_builder::ApiCtx; +use rivet_api_types::pagination::Pagination; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; +use utoipa::{IntoParams, ToSchema}; + +#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct GetQuery {} + +#[derive(Deserialize, Serialize, ToSchema)] +#[schema(as = NamespacesRunnerConfigsGetResponse)] +pub struct GetResponse { + pub runner_config: namespace::types::RunnerConfig, +} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct GetPath { + pub namespace_id: Id, + pub runner_name: String, +} + +pub async fn get(ctx: ApiCtx, path: GetPath, _query: GetQuery) -> Result { + let runner_config = ctx + .op(namespace::ops::runner_config::get_local::Input { + runners: vec![(path.namespace_id, path.runner_name)], + }) + .await? + .into_iter() + .next() + .ok_or_else(|| namespace::errors::RunnerConfig::NotFound.build())?; + + Ok(GetResponse { + runner_config: runner_config.config, + }) +} + +#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct ListQuery { + pub limit: Option, + pub cursor: Option, + pub variant: Option, +} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ListPath { + pub namespace_id: Id, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = NamespacesRunnerConfigsListResponse)] +pub struct ListResponse { + pub runner_configs: HashMap, + pub pagination: Pagination, +} + +pub async fn list(ctx: ApiCtx, path: ListPath, query: ListQuery) -> Result { + ctx.op(namespace::ops::get_local::Input { + namespace_ids: vec![path.namespace_id], + }) + .await? + .first() + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + // Parse variant from cursor if needed + let (variant, after_name) = if let Some(cursor) = query.cursor { + if let Some((variant, after_name)) = cursor.split_once(":") { + if query.variant.is_some() { + (query.variant, Some(after_name.to_string())) + } else { + ( + namespace::keys::RunnerConfigVariant::parse(variant), + Some(after_name.to_string()), + ) + } + } else { + (query.variant, None) + } + } else { + (query.variant, None) + }; + + let runner_configs_res = ctx + .op(namespace::ops::runner_config::list::Input { + namespace_id: path.namespace_id, + variant, + after_name, + limit: query.limit.unwrap_or(100), + }) + .await?; + + let cursor = runner_configs_res + .last() + .map(|(name, config)| format!("{}:{}", config.variant(), name)); + + Ok(ListResponse { + // TODO: Implement ComposeSchema for FakeMap so we don't have to reallocate + runner_configs: runner_configs_res.into_iter().collect(), + pagination: Pagination { cursor }, + }) +} + +#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct UpsertQuery {} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct UpsertPath { + pub namespace_id: Id, + pub runner_name: String, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = NamespacesRunnerConfigsUpsertRequest)] +pub struct UpsertRequest(namespace::types::RunnerConfig); + +#[derive(Deserialize, Serialize, ToSchema)] +#[schema(as = NamespacesRunnerConfigsUpsertResponse)] +pub struct UpsertResponse {} + +pub async fn upsert( + ctx: ApiCtx, + path: UpsertPath, + _query: UpsertQuery, + body: UpsertRequest, +) -> Result { + ctx.op(namespace::ops::get_local::Input { + namespace_ids: vec![path.namespace_id], + }) + .await? + .first() + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + ctx.op(namespace::ops::runner_config::upsert::Input { + namespace_id: path.namespace_id, + name: path.runner_name, + config: body.0, + }) + .await?; + + Ok(UpsertResponse {}) +} + +#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct DeleteQuery {} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct DeletePath { + pub namespace_id: Id, + pub runner_name: String, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = NamespacesRunnerConfigsDeleteRequest)] +pub struct DeleteRequest(namespace::types::RunnerConfig); + +#[derive(Deserialize, Serialize, ToSchema)] +#[schema(as = NamespacesRunnerConfigsDeleteResponse)] +pub struct DeleteResponse {} + +pub async fn delete(ctx: ApiCtx, path: DeletePath, _query: DeleteQuery) -> Result { + ctx.op(namespace::ops::get_local::Input { + namespace_ids: vec![path.namespace_id], + }) + .await? + .first() + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + ctx.op(namespace::ops::runner_config::delete::Input { + namespace_id: path.namespace_id, + name: path.runner_name, + }) + .await?; + + Ok(DeleteResponse {}) +} diff --git a/packages/core/api-peer/src/router.rs b/packages/core/api-peer/src/router.rs index 14c4ce1f83..fc8dd3dfce 100644 --- a/packages/core/api-peer/src/router.rs +++ b/packages/core/api-peer/src/router.rs @@ -14,11 +14,27 @@ pub async fn router( .route("/namespaces", get(namespaces::list)) .route("/namespaces", post(namespaces::create)) .route("/namespaces/{namespace_id}", get(namespaces::get)) - .route("/namespaces/{namespace_id}", put(namespaces::update)) .route( "/namespaces/resolve/{name}", get(namespaces::resolve_for_name), ) + // MARK: Runner configs + .route( + "/namespaces/{namespace_id}/runner-configs", + get(namespaces::runner_configs::list), + ) + .route( + "/namespaces/{namespace_id}/runner-configs/{runner_name}", + put(namespaces::runner_configs::upsert), + ) + .route( + "/namespaces/{namespace_id}/runner-configs/{runner_name}", + get(namespaces::runner_configs::get), + ) + .route( + "/namespaces/{namespace_id}/runner-configs/{runner_name}", + delete(namespaces::runner_configs::delete), + ) // MARK: Actors .route("/actors", get(actors::list::list)) .route("/actors", post(actors::create::create)) @@ -31,6 +47,10 @@ pub async fn router( .route("/runners/names", get(runners::list_names)) // MARK: Internal .route("/cache/purge", post(internal::cache_purge)) + .route( + "/bump-serverless-autoscaler", + post(internal::bump_serverless_autoscaler), + ) }) .await } diff --git a/packages/core/api-public/src/actors/list_names.rs b/packages/core/api-public/src/actors/list_names.rs index 40522cbebc..aadbd9b6ec 100644 --- a/packages/core/api-public/src/actors/list_names.rs +++ b/packages/core/api-public/src/actors/list_names.rs @@ -47,25 +47,21 @@ async fn list_names_inner( }; // Fanout to all datacenters - let mut all_names = fanout_to_datacenters::< - ListNamesResponse, - _, - _, - _, - _, - rivet_util::serde::FakeMap, - >( - ctx, - headers, - "/actors/names", - peer_query, - |ctx, query| async move { rivet_api_peer::actors::list_names::list_names(ctx, (), query).await }, - |res, agg| agg.extend(res.names), - ) - .await?; + let mut all_names = + fanout_to_datacenters::>( + ctx, + headers, + "/actors/names", + peer_query, + |ctx, query| async move { + rivet_api_peer::actors::list_names::list_names(ctx, (), query).await + }, + |res, agg| agg.extend(res.names), + ) + .await?; // Sort by name for consistency - all_names.sort(); + all_names.sort_by(|a, b| a.0.cmp(&b.0)); // Truncate to the requested limit all_names.truncate(query.limit.unwrap_or(100)); @@ -73,8 +69,8 @@ async fn list_names_inner( let cursor = all_names.last().map(|(name, _)| name.to_string()); Ok(ListNamesResponse { - // TODO: Implement ComposeSchema for FakeMap so we don't have to use .into() - names: all_names.into(), + // TODO: Implement ComposeSchema for FakeMap so we don't have to reallocate + names: all_names.into_iter().collect(), pagination: Pagination { cursor }, }) } diff --git a/packages/core/api-public/src/namespaces.rs b/packages/core/api-public/src/namespaces/mod.rs similarity index 73% rename from packages/core/api-public/src/namespaces.rs rename to packages/core/api-public/src/namespaces/mod.rs index f1c4173124..7c2ef444bc 100644 --- a/packages/core/api-public/src/namespaces.rs +++ b/packages/core/api-public/src/namespaces/mod.rs @@ -10,6 +10,8 @@ use rivet_util::Id; use rivet_api_client::{request_remote_datacenter, request_remote_datacenter_raw}; use rivet_api_peer::namespaces::*; +pub mod runner_configs; + #[utoipa::path( get, operation_id = "namespaces_list", @@ -137,54 +139,3 @@ async fn create_inner( .await } } - -#[utoipa::path( - put, - operation_id = "namespaces_update", - path = "/namespaces/{namespace_id}", - params( - ("namespace_id" = Id, Path), - UpdateQuery, - ), - request_body(content = UpdateRequest, content_type = "application/json"), - responses( - (status = 200, body = UpdateResponse), - ), -)] -pub async fn update( - Extension(ctx): Extension, - headers: HeaderMap, - Path(path): Path, - Query(query): Query, - Json(body): Json, -) -> Response { - match update_inner(ctx, headers, path, query, body).await { - Ok(response) => response, - Err(err) => ApiError::from(err).into_response(), - } -} - -async fn update_inner( - ctx: ApiCtx, - headers: HeaderMap, - path: UpdatePath, - query: UpdateQuery, - body: UpdateRequest, -) -> Result { - if ctx.config().is_leader() { - let res = rivet_api_peer::namespaces::update(ctx, path, query, body).await?; - Ok(Json(res).into_response()) - } else { - let leader_dc = ctx.config().leader_dc()?; - request_remote_datacenter_raw( - &ctx, - leader_dc.datacenter_label, - &format!("/namespaces/{}", path.namespace_id), - axum::http::Method::PUT, - headers, - Some(&query), - Some(&body), - ) - .await - } -} diff --git a/packages/core/api-public/src/namespaces/runner_configs.rs b/packages/core/api-public/src/namespaces/runner_configs.rs new file mode 100644 index 0000000000..250bf7ee3e --- /dev/null +++ b/packages/core/api-public/src/namespaces/runner_configs.rs @@ -0,0 +1,214 @@ +use anyhow::Result; +use axum::{ + extract::{Extension, Path, Query}, + http::HeaderMap, + response::{IntoResponse, Json, Response}, +}; +use rivet_api_builder::{ApiCtx, ApiError}; +use rivet_util::Id; + +use rivet_api_client::request_remote_datacenter; +use rivet_api_peer::namespaces::runner_configs::*; + +#[utoipa::path( + get, + operation_id = "namespaces_runner_configs_get", + path = "/namespaces/{namespace_id}/runner-configs/{runner_name}", + params( + ("namespace_id" = Id, Path), + ("runner_name" = String, Path), + GetQuery, + ), + responses( + (status = 200, body = GetResponse), + ), +)] +pub async fn get( + Extension(ctx): Extension, + headers: HeaderMap, + Path(path): Path, + Query(query): Query, +) -> Response { + match get_inner(ctx, headers, path, query).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn get_inner( + ctx: ApiCtx, + headers: HeaderMap, + path: GetPath, + query: GetQuery, +) -> Result { + if ctx.config().is_leader() { + rivet_api_peer::namespaces::runner_configs::get(ctx, path, query).await + } else { + let leader_dc = ctx.config().leader_dc()?; + request_remote_datacenter::( + ctx.config(), + leader_dc.datacenter_label, + &format!( + "/namespaces/{}/runner-configs/{}", + path.namespace_id, path.runner_name + ), + axum::http::Method::GET, + headers, + Some(&query), + Option::<&()>::None, + ) + .await + } +} + +#[utoipa::path( + get, + operation_id = "namespaces_runner_configs_list", + path = "/namespaces/{namespace_id}/runner-configs", + params( + ("namespace_id" = Id, Path), + ListQuery, + ), + responses( + (status = 200, body = ListResponse), + ), +)] +pub async fn list( + Extension(ctx): Extension, + headers: HeaderMap, + Path(path): Path, + Query(query): Query, +) -> Response { + match list_inner(ctx, headers, path, query).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn list_inner( + ctx: ApiCtx, + headers: HeaderMap, + path: ListPath, + query: ListQuery, +) -> Result { + if ctx.config().is_leader() { + rivet_api_peer::namespaces::runner_configs::list(ctx, path, query).await + } else { + let leader_dc = ctx.config().leader_dc()?; + request_remote_datacenter::( + ctx.config(), + leader_dc.datacenter_label, + &format!("/namespaces/{}/runner-configs", path.namespace_id), + axum::http::Method::GET, + headers, + Some(&query), + Option::<&()>::None, + ) + .await + } +} + +#[utoipa::path( + put, + operation_id = "namespaces_runner_configs_upsert", + path = "/namespaces/{namespace_id}/runner-configs/{runner_name}", + params( + ("namespace_id" = Id, Path), + ("runner_name" = String, Path), + UpsertQuery, + ), + request_body(content = UpsertRequest, content_type = "application/json"), + responses( + (status = 200, body = UpsertResponse), + ), +)] +pub async fn upsert( + Extension(ctx): Extension, + headers: HeaderMap, + Path(path): Path, + Query(query): Query, + Json(body): Json, +) -> Response { + match upsert_inner(ctx, headers, path, query, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn upsert_inner( + ctx: ApiCtx, + headers: HeaderMap, + path: UpsertPath, + query: UpsertQuery, + body: UpsertRequest, +) -> Result { + if ctx.config().is_leader() { + rivet_api_peer::namespaces::runner_configs::upsert(ctx, path, query, body).await + } else { + let leader_dc = ctx.config().leader_dc()?; + request_remote_datacenter::( + ctx.config(), + leader_dc.datacenter_label, + &format!( + "/namespaces/{}/runner-configs/{}", + path.namespace_id, path.runner_name + ), + axum::http::Method::PUT, + headers, + Option::<&()>::None, + Some(&body), + ) + .await + } +} + +#[utoipa::path( + delete, + operation_id = "namespaces_runner_configs_delete", + path = "/namespaces/{namespace_id}/runner-configs/{runner_name}", + params( + ("namespace_id" = Id, Path), + ("runner_name" = String, Path), + DeleteQuery, + ), + responses( + (status = 200, body = DeleteResponse), + ), +)] +pub async fn delete( + Extension(ctx): Extension, + headers: HeaderMap, + Path(path): Path, + Query(query): Query, +) -> Response { + match delete_inner(ctx, headers, path, query).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn delete_inner( + ctx: ApiCtx, + headers: HeaderMap, + path: DeletePath, + query: DeleteQuery, +) -> Result { + if ctx.config().is_leader() { + rivet_api_peer::namespaces::runner_configs::delete(ctx, path, query).await + } else { + let leader_dc = ctx.config().leader_dc()?; + request_remote_datacenter::( + ctx.config(), + leader_dc.datacenter_label, + &format!( + "/namespaces/{}/runner-configs/{}", + path.namespace_id, path.runner_name + ), + axum::http::Method::DELETE, + headers, + Some(&query), + Option::<&()>::None, + ) + .await + } +} diff --git a/packages/core/api-public/src/router.rs b/packages/core/api-public/src/router.rs index a3075ffaa4..9c06fd15b8 100644 --- a/packages/core/api-public/src/router.rs +++ b/packages/core/api-public/src/router.rs @@ -22,8 +22,11 @@ use crate::{actors, datacenters, namespaces, runners, ui}; runners::list_names, namespaces::list, namespaces::get, - namespaces::update, namespaces::create, + namespaces::runner_configs::list, + namespaces::runner_configs::get, + namespaces::runner_configs::upsert, + namespaces::runner_configs::delete, datacenters::list, ))] pub struct ApiDoc; @@ -48,8 +51,20 @@ pub async fn router( axum::routing::get(namespaces::get), ) .route( - "/namespaces/{namespace_id}", - axum::routing::put(namespaces::update), + "/namespaces/{namespace_id}/runner-configs", + axum::routing::get(namespaces::runner_configs::list), + ) + .route( + "/namespaces/{namespace_id}/runner-configs/{runner_name}", + axum::routing::get(namespaces::runner_configs::get), + ) + .route( + "/namespaces/{namespace_id}/runner-configs/{runner_name}", + axum::routing::put(namespaces::runner_configs::upsert), + ) + .route( + "/namespaces/{namespace_id}/runner-configs/{runner_name}", + axum::routing::delete(namespaces::runner_configs::delete), ) // MARK: Actors .route("/actors", axum::routing::get(actors::list::list)) diff --git a/packages/core/guard/server/src/cache/actor.rs b/packages/core/guard/server/src/cache/actor.rs index 4cd073ed2d..1a954cb6fb 100644 --- a/packages/core/guard/server/src/cache/actor.rs +++ b/packages/core/guard/server/src/cache/actor.rs @@ -7,7 +7,6 @@ use anyhow::Result; use gas::prelude::*; use crate::routing::pegboard_gateway::{X_RIVET_ACTOR, X_RIVET_PORT}; -use hyper::header::HeaderName; #[tracing::instrument(skip_all)] pub fn build_cache_key(target: &str, path: &str, headers: &hyper::HeaderMap) -> Result { diff --git a/packages/core/pegboard-outbound/Cargo.toml b/packages/core/pegboard-serverless/Cargo.toml similarity index 94% rename from packages/core/pegboard-outbound/Cargo.toml rename to packages/core/pegboard-serverless/Cargo.toml index 039d612d4c..78eaca5978 100644 --- a/packages/core/pegboard-outbound/Cargo.toml +++ b/packages/core/pegboard-serverless/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "pegboard-outbound" +name = "pegboard-serverless" version.workspace = true authors.workspace = true license.workspace = true diff --git a/packages/core/pegboard-outbound/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs similarity index 73% rename from packages/core/pegboard-outbound/src/lib.rs rename to packages/core/pegboard-serverless/src/lib.rs index aea4ebd81d..9d6ff472bf 100644 --- a/packages/core/pegboard-outbound/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -9,7 +9,7 @@ use std::{ use anyhow::Result; use futures_util::{StreamExt, TryStreamExt}; use gas::prelude::*; -use namespace::types::RunnerKind; +use namespace::types::RunnerConfig; use pegboard::keys; use reqwest_eventsource as sse; use rivet_runner_protocol::protocol; @@ -31,13 +31,13 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R config.clone(), pools, cache, - "pegboard-outbound", + "pegboard-serverless", Id::new_v1(config.dc_label()), Id::new_v1(config.dc_label()), )?; let mut sub = ctx - .subscribe::(()) + .subscribe::(()) .await?; let mut outbound_connections = HashMap::new(); @@ -52,77 +52,77 @@ async fn tick( ctx: &StandaloneCtx, outbound_connections: &mut HashMap<(Id, String), Vec>, ) -> Result<()> { - let outbound_data = - ctx.udb()? - .run(|tx, _mc| async move { - let txs = tx.subspace(keys::subspace()); - let outbound_desired_subspace = txs.subspace( - &rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::entire_subspace(), - ); - - txs.get_ranges_keyvalues( - udb::RangeOption { - mode: StreamingMode::WantAll, - ..(&outbound_desired_subspace).into() - }, - // NOTE: This is a snapshot to prevent conflict with updates to this subspace - SNAPSHOT, - ) - .map(|res| match res { - Ok(entry) => { - let (key, desired_slots) = - txs.read_entry::(&entry)?; - - Ok((key.namespace_id, key.runner_name_selector, desired_slots)) - } - Err(err) => Err(err.into()), - }) - .try_collect::>() - .await - }) - .await?; + let serverless_data = ctx + .udb()? + .run(|tx, _mc| async move { + let txs = tx.subspace(keys::subspace()); + + let serverless_desired_subspace = txs.subspace( + &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::entire_subspace(), + ); + + txs.get_ranges_keyvalues( + udb::RangeOption { + mode: StreamingMode::WantAll, + ..(&serverless_desired_subspace).into() + }, + // NOTE: This is a snapshot to prevent conflict with updates to this subspace + SNAPSHOT, + ) + .map(|res| match res { + Ok(entry) => { + let (key, desired_slots) = + txs.read_entry::(&entry)?; - let mut namespace_ids = outbound_data - .iter() - .map(|(ns_id, _, _)| *ns_id) - .collect::>(); - namespace_ids.dedup(); + Ok((key.namespace_id, key.runner_name, desired_slots)) + } + Err(err) => Err(err.into()), + }) + .try_collect::>() + .await + }) + .await?; - let namespaces = ctx - .op(namespace::ops::get_global::Input { namespace_ids }) + let runner_configs = ctx + .op(namespace::ops::runner_config::get_global::Input { + runners: serverless_data + .iter() + .map(|(ns_id, runner_name, _)| (*ns_id, runner_name.clone())) + .collect(), + }) .await?; - for (ns_id, runner_name_selector, desired_slots) in &outbound_data { - let namespace = namespaces + for (ns_id, runner_name, desired_slots) in &serverless_data { + let runner_config = runner_configs .iter() - .find(|ns| ns.namespace_id == *ns_id) - .context("ns not found")?; + .find(|rc| rc.namespace_id == *ns_id) + .context("runner config not found")?; - let RunnerKind::Outbound { + let RunnerConfig::Serverless { url, request_lifespan, slots_per_runner, min_runners, max_runners, runners_margin, - } = &namespace.runner_kind + } = &runner_config.config else { tracing::warn!( ?ns_id, - "this namespace should not be in the outbound subspace (wrong runner kind)" + "this runner config should not be in the serverless subspace (wrong config kind)" ); continue; }; let curr = outbound_connections - .entry((*ns_id, runner_name_selector.clone())) + .entry((*ns_id, runner_name.clone())) .or_insert_with(Vec::new); // Remove finished and draining connections from list curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); let desired_count = (desired_slots.div_ceil(*slots_per_runner).max(*min_runners) - + runners_margin) + + *runners_margin) .min(*max_runners) .try_into()?; @@ -137,7 +137,7 @@ async fn tick( for conn in draining_connections { if conn.shutdown_tx.send(()).is_err() { tracing::warn!( - "outbound connection shutdown channel dropped, likely already stopped" + "serverless connection shutdown channel dropped, likely already stopped" ); } } @@ -146,7 +146,7 @@ async fn tick( let starting_connections = std::iter::repeat_with(|| { spawn_connection( ctx.clone(), - url.clone(), + url.to_string(), Duration::from_secs(*request_lifespan as u64), ) }) @@ -155,12 +155,10 @@ async fn tick( } // Remove entries that aren't returned from udb - outbound_connections.retain(|(ns_id, runner_name_selector), _| { - outbound_data + outbound_connections.retain(|(ns_id, runner_name), _| { + serverless_data .iter() - .any(|(ns_id2, runner_name_selector2, _)| { - ns_id == ns_id2 && runner_name_selector == runner_name_selector2 - }) + .any(|(ns_id2, runner_name2, _)| ns_id == ns_id2 && runner_name == runner_name2) }); Ok(()) @@ -186,7 +184,7 @@ fn spawn_connection( // On error, bump the autoscaler loop again let _ = ctx - .msg(rivet_types::msgs::pegboard::BumpOutboundAutoscaler {}) + .msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) .send() .await; } @@ -241,7 +239,7 @@ async fn outbound_handler( draining.store(true, Ordering::SeqCst); - ctx.msg(rivet_types::msgs::pegboard::BumpOutboundAutoscaler {}) + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) .send() .await?; diff --git a/packages/infra/engine/Cargo.toml b/packages/infra/engine/Cargo.toml index 975ea3d788..f141806f6c 100644 --- a/packages/infra/engine/Cargo.toml +++ b/packages/infra/engine/Cargo.toml @@ -19,7 +19,7 @@ gas.workspace = true hex.workspace = true include_dir.workspace = true lz4_flex.workspace = true -pegboard-outbound.workspace = true +pegboard-serverless.workspace = true pegboard-runner-ws.workspace = true reqwest.workspace = true rivet-api-peer.workspace = true diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index 890b12842b..7cfb989853 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -26,9 +26,9 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { Box::pin(rivet_bootstrap::start(config, pools)) }), Service::new( - "pegboard_outbound", + "pegboard_serverless", ServiceKind::Standalone, - |config, pools| Box::pin(pegboard_outbound::start(config, pools)), + |config, pools| Box::pin(pegboard_serverless::start(config, pools)), ), ]; diff --git a/packages/services/internal/Cargo.toml b/packages/services/internal/Cargo.toml index e9f9ec8a88..fd65a9323d 100644 --- a/packages/services/internal/Cargo.toml +++ b/packages/services/internal/Cargo.toml @@ -9,4 +9,5 @@ edition.workspace = true anyhow.workspace = true gas.workspace = true rivet-api-client.workspace = true +rivet-types.workspace = true serde.workspace = true diff --git a/packages/services/internal/src/ops/bump_serverless_autoscaler_global.rs b/packages/services/internal/src/ops/bump_serverless_autoscaler_global.rs new file mode 100644 index 0000000000..2c76300b27 --- /dev/null +++ b/packages/services/internal/src/ops/bump_serverless_autoscaler_global.rs @@ -0,0 +1,60 @@ +use std::fmt::Debug; + +use futures_util::StreamExt; +use gas::prelude::*; +use rivet_api_client::{HeaderMap, Method, request_remote_datacenter}; + +#[derive(Clone, Debug, Default)] +pub struct Input {} + +#[operation] +pub async fn bump_serverless_autoscaler_global(ctx: &OperationCtx, input: &Input) -> Result<()> { + let dcs = &ctx.config().topology().datacenters; + + let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { + let ctx = ctx.clone(); + + async move { + if dc.datacenter_label == ctx.config().dc_label() { + // Local datacenter + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await + } else { + // Remote datacenter - HTTP request + request_remote_datacenter( + ctx.config(), + dc.datacenter_label, + "/bump-serverless-autoscaler", + Method::POST, + HeaderMap::new(), + Option::<&()>::None, + Option::<&()>::None, + ) + .await + } + } + })) + .buffer_unordered(16) + .collect::>() + .await; + + // Aggregate results + let result_count = results.len(); + let mut errors = Vec::new(); + for res in results { + if let Err(err) = res { + tracing::error!(?err, "failed to request edge dc"); + errors.push(err); + } + } + + // Error only if all requests failed + if result_count == errors.len() { + if let Some(res) = errors.into_iter().next() { + return Err(res).context("all datacenter requests failed"); + } + } + + Ok(()) +} diff --git a/packages/services/internal/src/ops/mod.rs b/packages/services/internal/src/ops/mod.rs index a5c08fdb0d..ab866e20d5 100644 --- a/packages/services/internal/src/ops/mod.rs +++ b/packages/services/internal/src/ops/mod.rs @@ -1 +1,2 @@ +pub mod bump_serverless_autoscaler_global; pub mod cache; diff --git a/packages/services/namespace/Cargo.toml b/packages/services/namespace/Cargo.toml index 823fb28bf2..46ec60f082 100644 --- a/packages/services/namespace/Cargo.toml +++ b/packages/services/namespace/Cargo.toml @@ -16,6 +16,7 @@ rivet-error.workspace = true rivet-types.workspace = true rivet-util.workspace = true serde.workspace = true +strum.workspace = true tracing.workspace = true udb-util.workspace = true universaldb.workspace = true diff --git a/packages/services/namespace/src/errors.rs b/packages/services/namespace/src/errors.rs index a76a0419ac..02e297e6a7 100644 --- a/packages/services/namespace/src/errors.rs +++ b/packages/services/namespace/src/errors.rs @@ -27,3 +27,13 @@ pub enum Namespace { )] InvalidUpdate { reason: String }, } + +#[derive(RivetError, Debug, Deserialize, Serialize)] +#[error("runner_config")] +pub enum RunnerConfig { + #[error("invalid", "Invalid runner config.", "Invalid runner config: {reason}")] + Invalid { reason: String }, + + #[error("not_found", "No config for this runner exists.")] + NotFound, +} diff --git a/packages/services/namespace/src/keys.rs b/packages/services/namespace/src/keys.rs index 0ed14ec8a2..93e133de25 100644 --- a/packages/services/namespace/src/keys.rs +++ b/packages/services/namespace/src/keys.rs @@ -2,7 +2,9 @@ use std::result::Result::Ok; use anyhow::*; use gas::prelude::*; +use serde::Serialize; use udb_util::prelude::*; +use utoipa::ToSchema; use versioned_data_util::OwnedVersionedData; pub fn subspace() -> udb_util::Subspace { @@ -146,95 +148,272 @@ impl<'de> TupleUnpack<'de> for CreateTsKey { } #[derive(Debug)] -pub struct RunnerKindKey { - namespace_id: Id, +pub struct ByNameKey { + name: String, } -impl RunnerKindKey { - pub fn new(namespace_id: Id) -> Self { - RunnerKindKey { namespace_id } +impl ByNameKey { + pub fn new(name: String) -> Self { + ByNameKey { name } + } +} + +impl FormalKey for ByNameKey { + /// Namespace id. + type Value = Id; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(Id::from_slice(raw)?) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.as_bytes()) + } +} + +impl TuplePack for ByNameKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (BY_NAME, &self.name); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for ByNameKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, name)) = <(usize, String)>::unpack(input, tuple_depth)?; + + let v = ByNameKey { name }; + + Ok((input, v)) + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, strum::FromRepr, ToSchema)] +#[serde(rename_all = "snake_case")] +pub enum RunnerConfigVariant { + Serverless = 0, +} + +impl RunnerConfigVariant { + pub fn parse(v: &str) -> Option { + match v { + "serverless" => Some(RunnerConfigVariant::Serverless), + _ => None, + } + } +} + +impl std::fmt::Display for RunnerConfigVariant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RunnerConfigVariant::Serverless => write!(f, "serverless"), + } } } -impl FormalKey for RunnerKindKey { - type Value = crate::types::RunnerKind; +#[derive(Debug)] +pub struct RunnerConfigKey { + pub namespace_id: Id, + pub name: String, +} + +impl RunnerConfigKey { + pub fn new(namespace_id: Id, name: String) -> Self { + RunnerConfigKey { namespace_id, name } + } + + pub fn subspace(namespace_id: Id) -> RunnerConfigSubspaceKey { + RunnerConfigSubspaceKey::new(namespace_id) + } +} + +impl FormalKey for RunnerConfigKey { + type Value = crate::types::RunnerConfig; fn deserialize(&self, raw: &[u8]) -> Result { Ok( - rivet_data::versioned::NamespaceRunnerKind::deserialize_with_embedded_version(raw)? + rivet_data::versioned::NamespaceRunnerConfig::deserialize_with_embedded_version(raw)? .into(), ) } fn serialize(&self, value: Self::Value) -> Result> { - rivet_data::versioned::NamespaceRunnerKind::latest(value.into()) + rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) .serialize_with_embedded_version( rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, ) } } -impl TuplePack for RunnerKindKey { +impl TuplePack for RunnerConfigKey { fn pack( &self, w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { - let t = (DATA, self.namespace_id, RUNNER_KIND); + let t = (RUNNER, CONFIG, DATA, self.namespace_id, &self.name); t.pack(w, tuple_depth) } } -impl<'de> TupleUnpack<'de> for RunnerKindKey { +impl<'de> TupleUnpack<'de> for RunnerConfigKey { fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, namespace_id, _)) = <(usize, Id, usize)>::unpack(input, tuple_depth)?; - let v = RunnerKindKey { namespace_id }; + let (input, (_, _, _, namespace_id, name)) = + <(usize, usize, usize, Id, String)>::unpack(input, tuple_depth)?; + + let v = RunnerConfigKey { namespace_id, name }; Ok((input, v)) } } +pub struct RunnerConfigSubspaceKey { + pub namespace_id: Id, +} + +impl RunnerConfigSubspaceKey { + pub fn new(namespace_id: Id) -> Self { + RunnerConfigSubspaceKey { namespace_id } + } +} + +impl TuplePack for RunnerConfigSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (RUNNER, CONFIG, DATA, self.namespace_id); + offset += t.pack(w, tuple_depth)?; + + Ok(offset) + } +} + #[derive(Debug)] -pub struct ByNameKey { - name: String, +pub struct RunnerConfigByVariantKey { + pub namespace_id: Id, + pub variant: RunnerConfigVariant, + pub name: String, } -impl ByNameKey { - pub fn new(name: String) -> Self { - ByNameKey { name } +impl RunnerConfigByVariantKey { + pub fn new(namespace_id: Id, variant: RunnerConfigVariant, name: String) -> Self { + RunnerConfigByVariantKey { + namespace_id, + name, + variant, + } + } + + pub fn subspace(namespace_id: Id) -> RunnerConfigByVariantSubspaceKey { + RunnerConfigByVariantSubspaceKey::new(namespace_id) + } + + pub fn subspace_with_variant( + namespace_id: Id, + variant: RunnerConfigVariant, + ) -> RunnerConfigByVariantSubspaceKey { + RunnerConfigByVariantSubspaceKey::new_with_variant(namespace_id, variant) } } -impl FormalKey for ByNameKey { - /// Namespace id. - type Value = Id; +impl FormalKey for RunnerConfigByVariantKey { + type Value = crate::types::RunnerConfig; fn deserialize(&self, raw: &[u8]) -> Result { - Ok(Id::from_slice(raw)?) + Ok( + rivet_data::versioned::NamespaceRunnerConfig::deserialize_with_embedded_version(raw)? + .into(), + ) } fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.as_bytes()) + rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) + .serialize_with_embedded_version( + rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, + ) } } -impl TuplePack for ByNameKey { +impl TuplePack for RunnerConfigByVariantKey { fn pack( &self, w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { - let t = (BY_NAME, &self.name); + let t = ( + RUNNER, + CONFIG, + BY_VARIANT, + self.namespace_id, + self.variant as usize, + &self.name, + ); t.pack(w, tuple_depth) } } -impl<'de> TupleUnpack<'de> for ByNameKey { +impl<'de> TupleUnpack<'de> for RunnerConfigByVariantKey { fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, name)) = <(usize, String)>::unpack(input, tuple_depth)?; - - let v = ByNameKey { name }; + let (input, (_, _, _, namespace_id, variant, name)) = + <(usize, usize, usize, Id, usize, String)>::unpack(input, tuple_depth)?; + let variant = RunnerConfigVariant::from_repr(variant).ok_or_else(|| { + PackError::Message(format!("invalid runner config variant `{variant}` in key").into()) + })?; + + let v = RunnerConfigByVariantKey { + namespace_id, + variant, + name, + }; Ok((input, v)) } } + +pub struct RunnerConfigByVariantSubspaceKey { + pub namespace_id: Id, + pub variant: Option, +} + +impl RunnerConfigByVariantSubspaceKey { + pub fn new(namespace_id: Id) -> Self { + RunnerConfigByVariantSubspaceKey { + namespace_id, + variant: None, + } + } + + pub fn new_with_variant(namespace_id: Id, variant: RunnerConfigVariant) -> Self { + RunnerConfigByVariantSubspaceKey { + namespace_id, + variant: Some(variant), + } + } +} + +impl TuplePack for RunnerConfigByVariantSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (RUNNER, CONFIG, BY_VARIANT, self.namespace_id); + offset += t.pack(w, tuple_depth)?; + + if let Some(variant) = self.variant { + offset += (variant as usize).pack(w, tuple_depth)?; + } + + Ok(offset) + } +} diff --git a/packages/services/namespace/src/ops/get_global.rs b/packages/services/namespace/src/ops/get_global.rs index a62eeda288..613f8030ee 100644 --- a/packages/services/namespace/src/ops/get_global.rs +++ b/packages/services/namespace/src/ops/get_global.rs @@ -10,13 +10,10 @@ pub struct Input { #[operation] pub async fn namespace_get_global(ctx: &OperationCtx, input: &Input) -> Result> { if ctx.config().is_leader() { - let namespaces_res = ctx - .op(crate::ops::get_local::Input { - namespace_ids: input.namespace_ids.clone(), - }) - .await?; - - Ok(namespaces_res.namespaces) + ctx.op(super::get_local::Input { + namespace_ids: input.namespace_ids.clone(), + }) + .await } else { let leader_dc = ctx.config().leader_dc()?; let client = rivet_pools::reqwest::client().await?; @@ -43,7 +40,8 @@ pub async fn namespace_get_global(ctx: &OperationCtx, input: &Input) -> Result(res).await?; + let res = + rivet_api_util::parse_response::(res).await?; for ns in res.namespaces { let namespace_id = ns.namespace_id; @@ -60,6 +58,6 @@ pub async fn namespace_get_global(ctx: &OperationCtx, input: &Input) -> Result, } diff --git a/packages/services/namespace/src/ops/get_local.rs b/packages/services/namespace/src/ops/get_local.rs index ed6663d589..3f1b5f4474 100644 --- a/packages/services/namespace/src/ops/get_local.rs +++ b/packages/services/namespace/src/ops/get_local.rs @@ -10,13 +10,8 @@ pub struct Input { pub namespace_ids: Vec, } -#[derive(Debug)] -pub struct Output { - pub namespaces: Vec, -} - #[operation] -pub async fn namespace_get(ctx: &OperationCtx, input: &Input) -> Result { +pub async fn namespace_get_local(ctx: &OperationCtx, input: &Input) -> Result> { if !ctx.config().is_leader() { return Err(errors::Namespace::NotLeader.build()); } @@ -38,7 +33,7 @@ pub async fn namespace_get(ctx: &OperationCtx, input: &Input) -> Result .custom_instrument(tracing::info_span!("namespace_get_tx")) .await?; - Ok(Output { namespaces }) + Ok(namespaces) } pub(crate) async fn get_inner( @@ -50,13 +45,11 @@ pub(crate) async fn get_inner( let name_key = keys::NameKey::new(namespace_id); let display_name_key = keys::DisplayNameKey::new(namespace_id); let create_ts_key = keys::CreateTsKey::new(namespace_id); - let runner_kind_key = keys::RunnerKindKey::new(namespace_id); - let (name, display_name, create_ts, runner_kind) = tokio::try_join!( + let (name, display_name, create_ts) = tokio::try_join!( txs.read_opt(&name_key, SERIALIZABLE), txs.read_opt(&display_name_key, SERIALIZABLE), txs.read_opt(&create_ts_key, SERIALIZABLE), - txs.read_opt(&runner_kind_key, SERIALIZABLE), )?; // Namespace not found @@ -70,15 +63,11 @@ pub(crate) async fn get_inner( let create_ts = create_ts.ok_or(udb::FdbBindingError::CustomError( format!("key should exist: {create_ts_key:?}").into(), ))?; - let runner_kind = runner_kind.ok_or(udb::FdbBindingError::CustomError( - format!("key should exist: {runner_kind_key:?}").into(), - ))?; Ok(Some(Namespace { namespace_id, name, display_name, create_ts, - runner_kind, })) } diff --git a/packages/services/namespace/src/ops/mod.rs b/packages/services/namespace/src/ops/mod.rs index 74fc79b4a9..f08fd1f5b5 100644 --- a/packages/services/namespace/src/ops/mod.rs +++ b/packages/services/namespace/src/ops/mod.rs @@ -3,3 +3,4 @@ pub mod get_local; pub mod list; pub mod resolve_for_name_global; pub mod resolve_for_name_local; +pub mod runner_config; diff --git a/packages/services/namespace/src/ops/runner_config/delete.rs b/packages/services/namespace/src/ops/runner_config/delete.rs new file mode 100644 index 0000000000..09ba689150 --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/delete.rs @@ -0,0 +1,55 @@ +use gas::prelude::*; +use rivet_cache::CacheKey; +use udb_util::{SERIALIZABLE, TxnExt}; + +use crate::{errors, keys}; + +#[derive(Debug)] +pub struct Input { + pub namespace_id: Id, + pub name: String, +} + +#[operation] +pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> { + if !ctx.config().is_leader() { + return Err(errors::Namespace::NotLeader.build()); + } + + ctx.udb()? + .run(|tx, _mc| async move { + let txs = tx.subspace(keys::subspace()); + + // Read existing config to determine variant + let runner_config_key = + keys::RunnerConfigKey::new(input.namespace_id, input.name.clone()); + + if let Some(config) = txs.read_opt(&runner_config_key, SERIALIZABLE).await? { + txs.delete(&runner_config_key); + + // Clear secondary idx + txs.delete(&keys::RunnerConfigByVariantKey::new( + input.namespace_id, + config.variant(), + input.name.clone(), + )); + } + + Ok(()) + }) + .custom_instrument(tracing::info_span!("runner_config_upsert_tx")) + .await?; + + // Purge cache in all dcs + ctx.op(internal::ops::cache::purge_global::Input { + base_key: "namespace.runner_config.{}.get_global".to_string(), + keys: vec![(input.namespace_id, input.name.as_str()).cache_key().into()], + }) + .await?; + + // Bump autoscaler in all dcs + ctx.op(internal::ops::bump_serverless_autoscaler_global::Input {}) + .await?; + + Ok(()) +} diff --git a/packages/services/namespace/src/ops/runner_config/get_global.rs b/packages/services/namespace/src/ops/runner_config/get_global.rs new file mode 100644 index 0000000000..64a2ade433 --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/get_global.rs @@ -0,0 +1,96 @@ +use std::collections::HashMap; + +use gas::prelude::*; + +use crate::types::RunnerConfig; + +#[derive(Debug)] +pub struct Input { + pub runners: Vec<(Id, String)>, +} + +#[operation] +pub async fn namespace_runner_config_get_global( + ctx: &OperationCtx, + input: &Input, +) -> Result> { + if ctx.config().is_leader() { + ctx.op(super::get_local::Input { + runners: input.runners.clone(), + }) + .await + } else { + let leader_dc = ctx.config().leader_dc()?; + let client = rivet_pools::reqwest::client().await?; + + ctx.cache() + .clone() + .request() + .fetch_all_json( + &format!("namespace.runner_config.get_global"), + input.runners.clone(), + { + let leader_dc = leader_dc.clone(); + let client = client.clone(); + + move |mut cache, runners| { + let leader_dc = leader_dc.clone(); + let client = client.clone(); + + async move { + let mut runner_names_by_namespace_id = + HashMap::with_capacity(runners.len()); + + for (namespace_id, runner_name) in runners { + let runner_names = runner_names_by_namespace_id + .entry(namespace_id) + .or_insert_with(Vec::new); + runner_names.push(runner_name); + } + + // TODO: Parallelize + for (namespace_id, runner_names) in runner_names_by_namespace_id { + let url = leader_dc + .api_peer_url + .join(&format!("/namespaces/{namespace_id}/runner-configs"))?; + let res = client + .get(url) + .query( + &runner_names + .iter() + .map(|runner_name| ("runner", runner_name)) + .collect::>(), + ) + .send() + .await?; + + let res = + rivet_api_util::parse_response::(res) + .await?; + + for (runner_name, runner_config) in res.runner_configs { + cache.resolve( + &(namespace_id, runner_name.clone()), + super::get_local::RunnerConfig { + namespace_id, + name: runner_name, + config: runner_config, + }, + ); + } + } + + Ok(cache) + } + } + }, + ) + .await + } +} + +// TODO: Cyclical dependency with api_peer +#[derive(Deserialize)] +struct RunnerConfigListResponse { + runner_configs: HashMap, +} diff --git a/packages/services/namespace/src/ops/runner_config/get_local.rs b/packages/services/namespace/src/ops/runner_config/get_local.rs new file mode 100644 index 0000000000..fd23ad9562 --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/get_local.rs @@ -0,0 +1,65 @@ +use futures_util::{StreamExt, TryStreamExt}; +use gas::prelude::*; +use serde::{Deserialize, Serialize}; +use udb_util::{SERIALIZABLE, TxnExt}; + +use crate::{errors, keys}; + +#[derive(Debug)] +pub struct Input { + pub runners: Vec<(Id, String)>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct RunnerConfig { + pub namespace_id: Id, + pub name: String, + pub config: crate::types::RunnerConfig, +} + +#[operation] +pub async fn namespace_runner_config_get_local( + ctx: &OperationCtx, + input: &Input, +) -> Result> { + if !ctx.config().is_leader() { + return Err(errors::Namespace::NotLeader.build()); + } + + let runner_configs = ctx + .udb()? + .run(|tx, _mc| async move { + futures_util::stream::iter(input.runners.clone()) + .map(|(namespace_id, runner_name)| { + let tx = tx.clone(); + + async move { + let txs = tx.subspace(keys::subspace()); + + let runner_config_key = + keys::RunnerConfigKey::new(namespace_id, runner_name.clone()); + + // Runner config not found + let Some(runner_config) = + txs.read_opt(&runner_config_key, SERIALIZABLE).await? + else { + return Ok(None); + }; + + Ok(Some(RunnerConfig { + namespace_id, + name: runner_name, + config: runner_config, + })) + } + }) + .buffer_unordered(1024) + .try_filter_map(|x| std::future::ready(Ok(x))) + .try_collect::>() + .await + }) + .custom_instrument(tracing::info_span!("runner_config_get_local_tx")) + .await?; + + Ok(runner_configs) +} diff --git a/packages/services/namespace/src/ops/runner_config/list.rs b/packages/services/namespace/src/ops/runner_config/list.rs new file mode 100644 index 0000000000..1b15b21fb2 --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/list.rs @@ -0,0 +1,94 @@ +use futures_util::{StreamExt, TryStreamExt}; +use gas::prelude::*; +use udb_util::{SERIALIZABLE, TxnExt}; +use universaldb::{self as udb, options::StreamingMode}; + +use crate::{errors, keys, types::RunnerConfig}; + +#[derive(Debug)] +pub struct Input { + pub namespace_id: Id, + pub variant: Option, + pub after_name: Option, + pub limit: usize, +} + +#[operation] +pub async fn namespace_runner_config_list( + ctx: &OperationCtx, + input: &Input, +) -> Result> { + if !ctx.config().is_leader() { + return Err(errors::Namespace::NotLeader.build()); + } + + let runner_configs = ctx + .udb()? + .run(|tx, _mc| async move { + let txs = tx.subspace(keys::subspace()); + + let (start, end) = if let Some(variant) = input.variant { + let (start, end) = txs + .subspace(&keys::RunnerConfigByVariantKey::subspace_with_variant( + input.namespace_id, + variant, + )) + .range(); + + let start = if let Some(name) = &input.after_name { + txs.pack(&keys::RunnerConfigByVariantKey::new( + input.namespace_id, + variant, + name.clone(), + )) + } else { + start + }; + + (start, end) + } else { + let (start, end) = txs + .subspace(&keys::RunnerConfigKey::subspace(input.namespace_id)) + .range(); + + let start = if let Some(name) = &input.after_name { + txs.pack(&keys::RunnerConfigKey::new( + input.namespace_id, + name.clone(), + )) + } else { + start + }; + + (start, end) + }; + + txs.get_ranges_keyvalues( + udb::RangeOption { + mode: StreamingMode::WantAll, + limit: Some(input.limit), + ..(start, end).into() + }, + SERIALIZABLE, + ) + .map(|res| match res { + Ok(entry) => { + if input.variant.is_some() { + let (key, config) = + txs.read_entry::(&entry)?; + Ok((key.name, config)) + } else { + let (key, config) = txs.read_entry::(&entry)?; + Ok((key.name, config)) + } + } + Err(err) => Err(err.into()), + }) + .try_collect() + .await + }) + .custom_instrument(tracing::info_span!("runner_config_get_local_tx")) + .await?; + + Ok(runner_configs) +} diff --git a/packages/services/namespace/src/ops/runner_config/mod.rs b/packages/services/namespace/src/ops/runner_config/mod.rs new file mode 100644 index 0000000000..3c44a67d92 --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/mod.rs @@ -0,0 +1,5 @@ +pub mod delete; +pub mod get_global; +pub mod get_local; +pub mod list; +pub mod upsert; diff --git a/packages/services/namespace/src/ops/runner_config/upsert.rs b/packages/services/namespace/src/ops/runner_config/upsert.rs new file mode 100644 index 0000000000..e530f8e3ce --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/upsert.rs @@ -0,0 +1,93 @@ +use gas::prelude::*; +use rivet_cache::CacheKey; +use udb_util::TxnExt; +use universaldb::options::MutationType; + +use crate::{errors, keys, types::RunnerConfig}; + +#[derive(Debug)] +pub struct Input { + pub namespace_id: Id, + pub name: String, + pub config: RunnerConfig, +} + +#[operation] +pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result<()> { + if !ctx.config().is_leader() { + return Err(errors::Namespace::NotLeader.build()); + } + + ctx.udb()? + .run(|tx, _mc| async move { + let txs = tx.subspace(keys::subspace()); + + // TODO: Once other types of configs get added, delete previous config before writing + txs.write( + &keys::RunnerConfigKey::new(input.namespace_id, input.name.clone()), + input.config.clone(), + )?; + + // Write to secondary idx + txs.write( + &keys::RunnerConfigByVariantKey::new( + input.namespace_id, + input.config.variant(), + input.name.clone(), + ), + input.config.clone(), + )?; + + match &input.config { + RunnerConfig::Serverless { + url, + slots_per_runner, + .. + } => { + // Validate url + if let Err(err) = url::Url::parse(url) { + return Ok(Err(errors::RunnerConfig::Invalid { + reason: format!("invalid serverless url: {err}"), + })); + } + + // Validate slots per runner + if *slots_per_runner == 0 { + return Ok(Err(errors::RunnerConfig::Invalid { + reason: "`slots_per_runner` cannot be 0".to_string(), + })); + } + + // Sets desired count to 0 if it doesn't exist + let txs = tx.subspace(rivet_types::keys::pegboard::subspace()); + txs.atomic_op( + &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( + input.namespace_id, + input.name.clone(), + ), + &0u32.to_le_bytes(), + MutationType::Add, + ); + } + } + + Ok(Ok(())) + }) + .custom_instrument(tracing::info_span!("runner_config_upsert_tx")) + .await? + .map_err(|err| err.build())?; + + // Purge cache in all dcs + let variant_str = serde_json::to_string(&input.config.variant())?; + ctx.op(internal::ops::cache::purge_global::Input { + base_key: format!("namespace.runner_config.{variant_str}.get_global"), + keys: vec![(input.namespace_id, input.name.as_str()).cache_key().into()], + }) + .await?; + + // Bump autoscaler in all dcs + ctx.op(internal::ops::bump_serverless_autoscaler_global::Input {}) + .await?; + + Ok(()) +} diff --git a/packages/services/namespace/src/types.rs b/packages/services/namespace/src/types.rs index a9709e4efb..2dc477ce11 100644 --- a/packages/services/namespace/src/types.rs +++ b/packages/services/namespace/src/types.rs @@ -1,20 +1,21 @@ use gas::prelude::*; use utoipa::ToSchema; +use crate::keys; + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct Namespace { pub namespace_id: Id, pub name: String, pub display_name: String, pub create_ts: i64, - pub runner_kind: RunnerKind, } #[derive(Debug, Clone, Serialize, Deserialize, Hash, ToSchema)] #[serde(rename_all = "snake_case")] -#[schema(as = NamespacesRunnerKind)] -pub enum RunnerKind { - Outbound { +#[schema(as = NamespacesRunnerConfig)] +pub enum RunnerConfig { + Serverless { url: String, /// Seconds. request_lifespan: u32, @@ -23,21 +24,28 @@ pub enum RunnerKind { max_runners: u32, runners_margin: u32, }, - Custom, } -impl From for rivet_data::generated::namespace_runner_kind_v1::Data { - fn from(value: RunnerKind) -> Self { +impl RunnerConfig { + pub fn variant(&self) -> keys::RunnerConfigVariant { + match self { + RunnerConfig::Serverless { .. } => keys::RunnerConfigVariant::Serverless, + } + } +} + +impl From for rivet_data::generated::namespace_runner_config_v1::Data { + fn from(value: RunnerConfig) -> Self { match value { - RunnerKind::Outbound { + RunnerConfig::Serverless { url, request_lifespan, slots_per_runner, min_runners, max_runners, runners_margin, - } => rivet_data::generated::namespace_runner_kind_v1::Data::Outbound( - rivet_data::generated::namespace_runner_kind_v1::Outbound { + } => rivet_data::generated::namespace_runner_config_v1::Data::Serverless( + rivet_data::generated::namespace_runner_config_v1::Serverless { url, request_lifespan, slots_per_runner, @@ -46,16 +54,15 @@ impl From for rivet_data::generated::namespace_runner_kind_v1::Data runners_margin, }, ), - RunnerKind::Custom => rivet_data::generated::namespace_runner_kind_v1::Data::Custom, } } } -impl From for RunnerKind { - fn from(value: rivet_data::generated::namespace_runner_kind_v1::Data) -> Self { +impl From for RunnerConfig { + fn from(value: rivet_data::generated::namespace_runner_config_v1::Data) -> Self { match value { - rivet_data::generated::namespace_runner_kind_v1::Data::Outbound(o) => { - RunnerKind::Outbound { + rivet_data::generated::namespace_runner_config_v1::Data::Serverless(o) => { + RunnerConfig::Serverless { url: o.url, request_lifespan: o.request_lifespan, slots_per_runner: o.slots_per_runner, @@ -64,7 +71,6 @@ impl From for RunnerKind runners_margin: o.runners_margin, } } - rivet_data::generated::namespace_runner_kind_v1::Data::Custom => RunnerKind::Custom, } } } diff --git a/packages/services/namespace/src/workflows/namespace.rs b/packages/services/namespace/src/workflows/namespace.rs index af5d5b4855..4feac7a0d2 100644 --- a/packages/services/namespace/src/workflows/namespace.rs +++ b/packages/services/namespace/src/workflows/namespace.rs @@ -1,11 +1,9 @@ use futures_util::FutureExt; use gas::prelude::*; -use rivet_cache::CacheKey; use serde::{Deserialize, Serialize}; use udb_util::{SERIALIZABLE, TxnExt}; -use utoipa::ToSchema; -use crate::{errors, keys, types::RunnerKind}; +use crate::{errors, keys}; #[derive(Debug, Deserialize, Serialize)] pub struct Input { @@ -59,34 +57,8 @@ pub async fn namespace(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { // Does nothing yet ctx.repeat(|ctx| { - let namespace_id = input.namespace_id; - async move { - let update = ctx.listen::().await?; - - let res = ctx - .activity(UpdateInput { - namespace_id, - update, - }) - .await?; - - if let Ok(update_res) = &res { - ctx.activity(PurgeCacheInput { namespace_id }).await?; - - if update_res.bump_autoscaler { - ctx.msg(rivet_types::msgs::pegboard::BumpOutboundAutoscaler {}) - .send() - .await?; - } - } - - ctx.msg(UpdateResult { - res: res.map(|_| ()), - }) - .tag("namespace_id", namespace_id) - .send() - .await?; + ctx.listen::().await?; Ok(Loop::<()>::Continue) } @@ -106,17 +78,7 @@ pub struct Failed { } #[signal("namespace_update")] -#[derive(Debug, Clone, Hash, ToSchema)] -#[schema(as = NamespacesUpdate)] -#[serde(rename_all = "snake_case")] -pub enum Update { - UpdateRunnerKind { runner_kind: RunnerKind }, -} - -#[message("namespace_update_result")] -pub struct UpdateResult { - pub res: Result<(), errors::Namespace>, -} +pub struct Update {} #[derive(Debug, Clone, Serialize, Deserialize, Hash)] pub struct ValidateInput { @@ -203,7 +165,6 @@ async fn insert_fdb( txs.write(&keys::NameKey::new(namespace_id), name)?; txs.write(&keys::DisplayNameKey::new(namespace_id), display_name)?; txs.write(&keys::CreateTsKey::new(namespace_id), input.create_ts)?; - txs.write(&keys::RunnerKindKey::new(namespace_id), RunnerKind::Custom)?; // Insert idx txs.write(&name_idx_key, namespace_id)?; @@ -215,95 +176,3 @@ async fn insert_fdb( .await .map_err(Into::into) } - -#[derive(Debug, Clone, Serialize, Deserialize, Hash)] -struct UpdateInput { - namespace_id: Id, - update: Update, -} - -#[derive(Debug, Clone, Serialize, Deserialize, Hash)] -struct UpdateOutput { - bump_autoscaler: bool, -} - -#[activity(UpdateActivity)] -async fn update( - ctx: &ActivityCtx, - input: &UpdateInput, -) -> Result> { - ctx - .udb()? - .run(|tx, _mc| { - let namespace_id = input.namespace_id; - let update = input.update.clone(); - - async move { - let txs = tx.subspace(keys::subspace()); - - let bump_autoscaler = match update { - Update::UpdateRunnerKind { runner_kind } => { - let bump_autoscaler = match &runner_kind { - RunnerKind::Outbound { - url, - slots_per_runner, - .. - } => { - // Validate url - if let Err(err) = url::Url::parse(url) { - return Ok(Err(errors::Namespace::InvalidUpdate { - reason: format!("invalid outbound url: {err}"), - })); - } - - // Validate slots per runner - if *slots_per_runner == 0 { - return Ok(Err(errors::Namespace::InvalidUpdate { - reason: "`slots_per_runner` cannot be 0".to_string(), - })); - } - - true - } - RunnerKind::Custom => { - // Clear outbound data - txs.delete_key_subspace(&rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::subspace(namespace_id)); - - false - } - }; - - txs.write(&keys::RunnerKindKey::new(namespace_id), runner_kind)?; - - bump_autoscaler - } - }; - - Ok(Ok(UpdateOutput { bump_autoscaler })) - } - }) - .custom_instrument(tracing::info_span!("namespace_create_tx")) - .await - .map_err(Into::into) -} - -#[derive(Debug, Clone, Serialize, Deserialize, Hash)] -struct PurgeCacheInput { - namespace_id: Id, -} - -#[activity(PurgeCache)] -async fn purge_cache(ctx: &ActivityCtx, input: &PurgeCacheInput) -> Result<()> { - let res = ctx - .op(internal::ops::cache::purge_global::Input { - base_key: "namespace.get_global".to_string(), - keys: vec![input.namespace_id.cache_key().into()], - }) - .await; - - if let Err(err) = res { - tracing::error!(?err, "failed to purge global namespace cache"); - } - - Ok(()) -} diff --git a/packages/services/pegboard/src/keys/mod.rs b/packages/services/pegboard/src/keys/mod.rs index 402214f8a0..3cb17c5bbb 100644 --- a/packages/services/pegboard/src/keys/mod.rs +++ b/packages/services/pegboard/src/keys/mod.rs @@ -6,7 +6,7 @@ pub mod ns; pub mod runner; pub fn subspace() -> udb_util::Subspace { - udb_util::Subspace::new(&(RIVET, PEGBOARD)) + rivet_types::keys::pegboard::subspace() } pub fn actor_kv_subspace() -> udb_util::Subspace { diff --git a/packages/services/pegboard/src/ops/actor/list_names.rs b/packages/services/pegboard/src/ops/actor/list_names.rs index 8fe72d2100..0cd3752ce3 100644 --- a/packages/services/pegboard/src/ops/actor/list_names.rs +++ b/packages/services/pegboard/src/ops/actor/list_names.rs @@ -15,7 +15,7 @@ pub struct Input { #[derive(Debug)] pub struct Output { - pub names: util::serde::FakeMap, + pub names: Vec<(String, ActorNameKeyData)>, } #[operation] @@ -55,7 +55,7 @@ pub async fn pegboard_actor_list_names(ctx: &OperationCtx, input: &Input) -> Res } Err(err) => Err(Into::::into(err)), }) - .try_collect::>() + .try_collect::>() .await }) .custom_instrument(tracing::info_span!("actor_list_names_tx")) diff --git a/packages/services/pegboard/src/workflows/actor/destroy.rs b/packages/services/pegboard/src/workflows/actor/destroy.rs index 3faf1f171c..7ba691c760 100644 --- a/packages/services/pegboard/src/workflows/actor/destroy.rs +++ b/packages/services/pegboard/src/workflows/actor/destroy.rs @@ -1,5 +1,4 @@ use gas::prelude::*; -use namespace::types::RunnerKind; use rivet_data::converted::ActorByKeyKeyData; use rivet_runner_protocol::protocol; use udb_util::{SERIALIZABLE, TxnExt}; @@ -86,7 +85,7 @@ async fn update_state_and_fdb( state.namespace_id, &state.runner_name_selector, runner_id, - &state.ns_runner_kind, + state.for_serverless, &tx, ) .await?; @@ -164,7 +163,7 @@ pub(crate) async fn clear_slot( namespace_id: Id, runner_name_selector: &str, runner_id: Id, - ns_runner_kind: &RunnerKind, + for_serverless: bool, tx: &udb::RetryableTransaction, ) -> Result<(), udb::FdbBindingError> { let txs = tx.subspace(keys::subspace()); @@ -235,9 +234,9 @@ pub(crate) async fn clear_slot( )?; } - if let RunnerKind::Outbound { .. } = ns_runner_kind { + if for_serverless { txs.atomic_op( - &rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::new( + &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( namespace_id, runner_name_selector.to_string(), ), diff --git a/packages/services/pegboard/src/workflows/actor/mod.rs b/packages/services/pegboard/src/workflows/actor/mod.rs index 3bff9c38fb..117cb85b6e 100644 --- a/packages/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/services/pegboard/src/workflows/actor/mod.rs @@ -1,6 +1,5 @@ use futures_util::FutureExt; use gas::prelude::*; -use namespace::types::RunnerKind; use rivet_runner_protocol::protocol; use rivet_types::actors::CrashPolicy; @@ -47,7 +46,7 @@ pub struct State { pub create_ts: i64, pub create_complete_ts: Option, - pub ns_runner_kind: RunnerKind, + pub for_serverless: bool, pub start_ts: Option, // NOTE: This is not the alarm ts, this is when the actor started sleeping. See `LifecycleState` for alarm @@ -70,7 +69,6 @@ impl State { runner_name_selector: String, crash_policy: CrashPolicy, create_ts: i64, - ns_runner_kind: RunnerKind, ) -> Self { State { name, @@ -83,7 +81,7 @@ impl State { create_ts, create_complete_ts: None, - ns_runner_kind, + for_serverless: false, start_ts: None, pending_allocation_ts: None, @@ -122,18 +120,15 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - let metadata = match validation_res { - Ok(metadata) => metadata, - Err(error) => { - ctx.msg(Failed { error }) - .tag("actor_id", input.actor_id) - .send() - .await?; + if let Err(error) = validation_res { + ctx.msg(Failed { error }) + .tag("actor_id", input.actor_id) + .send() + .await?; - // TODO(RVT-3928): return Ok(Err); - return Ok(()); - } - }; + // TODO(RVT-3928): return Ok(Err); + return Ok(()); + } ctx.activity(setup::InitStateAndUdbInput { actor_id: input.actor_id, @@ -143,7 +138,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> runner_name_selector: input.runner_name_selector.clone(), crash_policy: input.crash_policy, create_ts: ctx.create_ts(), - ns_runner_kind: metadata.ns_runner_kind, }) .await?; diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index 3371834f83..9172583612 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -3,7 +3,6 @@ use std::time::Instant; use futures_util::StreamExt; use futures_util::{FutureExt, TryStreamExt}; use gas::prelude::*; -use namespace::types::RunnerKind; use rivet_metrics::KeyValue; use rivet_runner_protocol::protocol; use udb_util::{FormalKey, SERIALIZABLE, SNAPSHOT, TxnExt}; @@ -103,20 +102,30 @@ async fn allocate_actor( let start_instant = Instant::now(); let mut state = ctx.state::()?; let namespace_id = state.namespace_id; - let ns_runner_kind = &state.ns_runner_kind; // NOTE: This txn should closely resemble the one found in the allocate_pending_actors activity of the // client wf - let res = ctx + let (for_serverless, res) = ctx .udb()? .run(|tx, _mc| async move { let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS; let txs = tx.subspace(keys::subspace()); - // Increment desired slots if namespace has an outbound runner kind - if let RunnerKind::Outbound { .. } = ns_runner_kind { + // Check if runner is an serverless runner + let for_serverless = txs + .exists( + &namespace::keys::RunnerConfigByVariantKey::new( + namespace_id, + namespace::keys::RunnerConfigVariant::Serverless, + input.runner_name_selector.clone(), + ), + SERIALIZABLE, + ) + .await?; + + if for_serverless { txs.atomic_op( - &rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::new( + &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( namespace_id, input.runner_name_selector.clone(), ), @@ -244,10 +253,13 @@ async fn allocate_actor( // Set actor as not sleeping txs.delete(&keys::actor::SleepTsKey::new(input.actor_id)); - return Ok(Ok(AllocateActorOutput { - runner_id: old_runner_alloc_key.runner_id, - runner_workflow_id: old_runner_alloc_key_data.workflow_id, - })); + return Ok(( + for_serverless, + Ok(AllocateActorOutput { + runner_id: old_runner_alloc_key.runner_id, + runner_workflow_id: old_runner_alloc_key_data.workflow_id, + }), + )); } } @@ -268,7 +280,7 @@ async fn allocate_actor( input.generation, )?; - return Ok(Err(pending_ts)); + return Ok((for_serverless, Err(pending_ts))); }) .custom_instrument(tracing::info_span!("actor_allocate_tx")) .await?; @@ -277,6 +289,8 @@ async fn allocate_actor( metrics::ACTOR_ALLOCATE_DURATION .record(dt, &[KeyValue::new("did_reserve", res.is_ok().to_string())]); + state.for_serverless = for_serverless; + match &res { Ok(res) => { state.sleep_ts = None; @@ -327,7 +341,7 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() let runner_name_selector = &state.runner_name_selector; let namespace_id = state.namespace_id; let runner_id = state.runner_id; - let ns_runner_kind = &state.ns_runner_kind; + let for_serverless = state.for_serverless; ctx.udb()? .run(|tx, _mc| async move { @@ -341,13 +355,13 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() namespace_id, runner_name_selector, runner_id, - ns_runner_kind, + for_serverless, &tx, ) .await?; - } else if let RunnerKind::Outbound { .. } = ns_runner_kind { + } else if for_serverless { txs.atomic_op( - &rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::new( + &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( namespace_id, runner_name_selector.clone(), ), @@ -391,7 +405,7 @@ pub async fn spawn_actor( "failed to allocate (no availability), waiting for allocation", ); - ctx.msg(rivet_types::msgs::pegboard::BumpOutboundAutoscaler {}) + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) .send() .await?; diff --git a/packages/services/pegboard/src/workflows/actor/setup.rs b/packages/services/pegboard/src/workflows/actor/setup.rs index 421b7228fc..068cc562f0 100644 --- a/packages/services/pegboard/src/workflows/actor/setup.rs +++ b/packages/services/pegboard/src/workflows/actor/setup.rs @@ -1,5 +1,4 @@ use gas::prelude::*; -use namespace::types::RunnerKind; use rivet_data::converted::ActorNameKeyData; use rivet_types::actors::CrashPolicy; use udb_util::{SERIALIZABLE, TxnExt}; @@ -18,23 +17,18 @@ pub struct ValidateInput { pub input: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ValidateOutput { - pub ns_runner_kind: RunnerKind, -} - #[activity(Validate)] pub async fn validate( ctx: &ActivityCtx, input: &ValidateInput, -) -> Result> { +) -> Result> { let ns_res = ctx .op(namespace::ops::get_global::Input { namespace_ids: vec![input.namespace_id], }) .await?; - let Some(ns) = ns_res.into_iter().next() else { + if ns_res.is_empty() { return Ok(Err(errors::Actor::NamespaceNotFound)); }; @@ -61,9 +55,7 @@ pub async fn validate( } } - Ok(Ok(ValidateOutput { - ns_runner_kind: ns.runner_kind, - })) + Ok(Ok(())) } #[derive(Debug, Clone, Serialize, Deserialize, Hash)] @@ -75,7 +67,6 @@ pub struct InitStateAndUdbInput { pub runner_name_selector: String, pub crash_policy: CrashPolicy, pub create_ts: i64, - pub ns_runner_kind: RunnerKind, } #[activity(InitStateAndFdb)] @@ -89,7 +80,6 @@ pub async fn insert_state_and_fdb(ctx: &ActivityCtx, input: &InitStateAndUdbInpu input.runner_name_selector.clone(), input.crash_policy, input.create_ts, - input.ns_runner_kind.clone(), )); ctx.udb()? diff --git a/sdks/rust/data/src/versioned.rs b/sdks/rust/data/src/versioned.rs index 002363d361..551e518158 100644 --- a/sdks/rust/data/src/versioned.rs +++ b/sdks/rust/data/src/versioned.rs @@ -207,20 +207,20 @@ impl OwnedVersionedData for ActorNameKeyData { } } -pub enum NamespaceRunnerKind { - V1(namespace_runner_kind_v1::Data), +pub enum NamespaceRunnerConfig { + V1(namespace_runner_config_v1::Data), } -impl OwnedVersionedData for NamespaceRunnerKind { - type Latest = namespace_runner_kind_v1::Data; +impl OwnedVersionedData for NamespaceRunnerConfig { + type Latest = namespace_runner_config_v1::Data; - fn latest(latest: namespace_runner_kind_v1::Data) -> Self { - NamespaceRunnerKind::V1(latest) + fn latest(latest: namespace_runner_config_v1::Data) -> Self { + NamespaceRunnerConfig::V1(latest) } fn into_latest(self) -> Result { #[allow(irrefutable_let_patterns)] - if let NamespaceRunnerKind::V1(data) = self { + if let NamespaceRunnerConfig::V1(data) = self { Ok(data) } else { bail!("version not latest"); @@ -229,14 +229,14 @@ impl OwnedVersionedData for NamespaceRunnerKind { fn deserialize_version(payload: &[u8], version: u16) -> Result { match version { - 1 => Ok(NamespaceRunnerKind::V1(serde_bare::from_slice(payload)?)), + 1 => Ok(NamespaceRunnerConfig::V1(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } fn serialize_version(self, _version: u16) -> Result> { match self { - NamespaceRunnerKind::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), + NamespaceRunnerConfig::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), } } } diff --git a/sdks/schemas/data/namespace.runner_kind.v1.bare b/sdks/schemas/data/namespace.runner_config.v1.bare similarity index 69% rename from sdks/schemas/data/namespace.runner_kind.v1.bare rename to sdks/schemas/data/namespace.runner_config.v1.bare index 05c8eaa739..a25e630013 100644 --- a/sdks/schemas/data/namespace.runner_kind.v1.bare +++ b/sdks/schemas/data/namespace.runner_config.v1.bare @@ -1,4 +1,4 @@ -type Outbound struct { +type Serverless struct { url: str request_lifespan: u32 slots_per_runner: u32 @@ -7,9 +7,6 @@ type Outbound struct { runners_margin: u32 } -type Custom void - type Data union { - Outbound | - Custom + Serverless }