diff --git a/Cargo.lock b/Cargo.lock index 50db0f0fd1..0758fb75e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2378,6 +2378,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "internal" +version = "0.0.1" +dependencies = [ + "anyhow", + "gasoline", + "rivet-api-client", + "serde", +] + [[package]] name = "io-uring" version = "0.7.9" @@ -2771,15 +2781,18 @@ version = "0.0.1" dependencies = [ "anyhow", "gasoline", + "internal", "rivet-api-builder", "rivet-api-util", "rivet-data", "rivet-error", + "rivet-types", "rivet-util", "serde", "tracing", "udb-util", "universaldb", + "url", "utoipa", "versioned-data-util", ] @@ -3315,6 +3328,24 @@ dependencies = [ "versioned-data-util", ] +[[package]] +name = "pegboard-outbound" +version = "0.0.1" +dependencies = [ + "anyhow", + "epoxy", + "gasoline", + "namespace", + "pegboard", + "reqwest-eventsource", + "rivet-config", + "rivet-runner-protocol", + "rivet-types", + "tracing", + "udb-util", + "universaldb", +] + [[package]] name = "pegboard-runner-ws" version = "0.0.1" @@ -4677,7 +4708,9 @@ dependencies = [ "rivet-runner-protocol", "rivet-util", "serde", + "udb-util", "utoipa", + "versioned-data-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 284892b56b..aa4bd1df99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/udb-util","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-outbound","packages/core/pegboard-runner-ws","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"] +members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/udb-util","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-outbound","packages/core/pegboard-runner-ws","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/internal","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"] [workspace.package] version = "25.6.1" @@ -280,7 +280,6 @@ path = "packages/common/error/core" [workspace.dependencies.rivet-error-macros] path = "packages/common/error/macros" - [workspace.dependencies.gas] package = "gasoline" path = "packages/common/gasoline/core" @@ -382,6 +381,9 @@ path = "packages/infra/engine" [workspace.dependencies.epoxy] path = "packages/services/epoxy" +[workspace.dependencies.internal] +path = "packages/services/internal" + [workspace.dependencies.namespace] path = "packages/services/namespace" diff --git a/docker/dev/docker-compose.yml b/docker/dev/docker-compose.yml index 64de5f178d..d1b9fd8cfb 100644 --- a/docker/dev/docker-compose.yml +++ b/docker/dev/docker-compose.yml @@ -187,7 +187,7 @@ services: environment: - RIVET_ENDPOINT=http://rivet-engine:6420 - RUNNER_HOST=runner - # - NO_AUTOSTART=1 + - NO_AUTOSTART=1 stop_grace_period: 4s ports: - '5050:5050' diff --git a/docker/dev/rivet-engine/config.jsonc b/docker/dev/rivet-engine/config.jsonc index 810d5be6e5..ba8f2b4dea 100644 --- a/docker/dev/rivet-engine/config.jsonc +++ b/docker/dev/rivet-engine/config.jsonc @@ -32,13 +32,6 @@ "postgres": { "url": "postgresql://postgres:postgres@postgres:5432/rivet_engine" }, - "postgres_notify": { - "url": "postgresql://postgres:postgres@postgres:5432/rivet_engine", - "memory_optimization": false - }, - // "memory": { - // "channel": "default" - // }, "cache": { "driver": "in_memory" }, diff --git a/out/errors/namespace.invalid_update.json b/out/errors/namespace.invalid_update.json new file mode 100644 index 0000000000..2dda6ea2dc --- /dev/null +++ b/out/errors/namespace.invalid_update.json @@ -0,0 +1,5 @@ +{ + "code": "invalid_update", + "group": "namespace", + "message": "Failed to update namespace." +} \ No newline at end of file diff --git a/out/openapi.json b/out/openapi.json index ff97997a91..f4278895ed 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -469,7 +469,7 @@ { "name": "namespace_id", "in": "query", - "required": true, + "required": false, "schema": { "type": "array", "items": { @@ -548,6 +548,44 @@ } } } + }, + "put": { + "tags": [ + "namespaces" + ], + "operationId": "namespaces_update", + "parameters": [ + { + "name": "namespace_id", + "in": "path", + "required": true, + "schema": { + "$ref": "#/components/schemas/RivetId" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/NamespacesUpdateRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/NamespacesUpdateResponse" + } + } + } + } + } } }, "/runners": { @@ -1071,7 +1109,7 @@ "$ref": "#/components/schemas/RivetId" }, "runner_kind": { - "$ref": "#/components/schemas/RunnerKind" + "$ref": "#/components/schemas/NamespacesRunnerKind" } } }, @@ -1133,6 +1171,95 @@ }, "additionalProperties": false }, + "NamespacesRunnerKind": { + "oneOf": [ + { + "type": "object", + "required": [ + "outbound" + ], + "properties": { + "outbound": { + "type": "object", + "required": [ + "url", + "request_lifespan", + "slots_per_runner", + "min_runners", + "max_runners", + "runners_margin" + ], + "properties": { + "max_runners": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "min_runners": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "request_lifespan": { + "type": "integer", + "format": "int32", + "description": "Seconds.", + "minimum": 0 + }, + "runners_margin": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "slots_per_runner": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "url": { + "type": "string" + } + } + } + } + }, + { + "type": "string", + "enum": [ + "custom" + ] + } + ] + }, + "NamespacesUpdate": { + "oneOf": [ + { + "type": "object", + "required": [ + "update_runner_kind" + ], + "properties": { + "update_runner_kind": { + "type": "object", + "required": [ + "runner_kind" + ], + "properties": { + "runner_kind": { + "$ref": "#/components/schemas/NamespacesRunnerKind" + } + } + } + } + } + ] + }, + "NamespacesUpdateRequest": { + "$ref": "#/components/schemas/NamespacesUpdate" + }, + "NamespacesUpdateResponse": { + "type": "object" + }, "Pagination": { "type": "object", "properties": { @@ -1253,59 +1380,6 @@ }, "additionalProperties": false }, - "RunnerKind": { - "oneOf": [ - { - "type": "object", - "required": [ - "outbound" - ], - "properties": { - "outbound": { - "type": "object", - "required": [ - "url", - "slots_per_runner", - "min_runners", - "max_runners", - "runners_margin" - ], - "properties": { - "max_runners": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "min_runners": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "runners_margin": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "slots_per_runner": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "url": { - "type": "string" - } - } - } - } - }, - { - "type": "string", - "enum": [ - "custom" - ] - } - ] - }, "RunnersGetResponse": { "type": "object", "required": [ diff --git a/packages/common/cache/build/src/key.rs b/packages/common/cache/build/src/key.rs index 22944adf49..0290799b4a 100644 --- a/packages/common/cache/build/src/key.rs +++ b/packages/common/cache/build/src/key.rs @@ -1,3 +1,4 @@ +use serde::{Deserialize, Serialize}; use std::fmt::Debug; /// A type that can be serialized in to a key that can be used in the cache. @@ -78,3 +79,18 @@ impl_to_string!(i32); impl_to_string!(i64); impl_to_string!(i128); impl_to_string!(isize); + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct RawCacheKey(String); + +impl CacheKey for RawCacheKey { + fn cache_key(&self) -> String { + self.0.clone() + } +} + +impl From for RawCacheKey { + fn from(value: String) -> Self { + RawCacheKey(value) + } +} diff --git a/packages/common/gasoline/core/src/db/kv/keys/history.rs b/packages/common/gasoline/core/src/db/kv/keys/history.rs index 8e6c1d95bd..9b8a838fd1 100644 --- a/packages/common/gasoline/core/src/db/kv/keys/history.rs +++ b/packages/common/gasoline/core/src/db/kv/keys/history.rs @@ -1151,6 +1151,7 @@ impl<'de> TupleUnpack<'de> for InnerEventTypeKey { } } +#[derive(Debug)] pub struct TagKey { workflow_id: Id, location: Location, diff --git a/packages/common/gasoline/core/src/db/kv/keys/workflow.rs b/packages/common/gasoline/core/src/db/kv/keys/workflow.rs index eea0f0e3b4..4a5e01d8ef 100644 --- a/packages/common/gasoline/core/src/db/kv/keys/workflow.rs +++ b/packages/common/gasoline/core/src/db/kv/keys/workflow.rs @@ -71,6 +71,7 @@ impl TuplePack for LeaseSubspaceKey { } } +#[derive(Debug)] pub struct TagKey { workflow_id: Id, pub k: String, @@ -882,6 +883,7 @@ impl TuplePack for PendingSignalSubspaceKey { } } +#[derive(Debug)] pub struct ByNameAndTagKey { workflow_name: String, k: String, diff --git a/packages/common/gasoline/macros/src/lib.rs b/packages/common/gasoline/macros/src/lib.rs index 0afef2ea64..178aec078e 100644 --- a/packages/common/gasoline/macros/src/lib.rs +++ b/packages/common/gasoline/macros/src/lib.rs @@ -166,6 +166,7 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { let struct_ident = Ident::new(&name, proc_macro2::Span::call_site()); let fn_name = item_fn.sig.ident.to_string(); + let generics = &item_fn.sig.generics; let fn_body = item_fn.block; let vis = item_fn.vis; @@ -186,7 +187,7 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { const NAME: &'static str = #fn_name; const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(#timeout); - async fn run(#ctx_ident: #ctx_ty, #input_ident: &Self::Input) -> Result { + async fn run #generics (#ctx_ident: #ctx_ty, #input_ident: &Self::Input) -> Result { #fn_body } } diff --git a/packages/common/types/Cargo.toml b/packages/common/types/Cargo.toml index 9ae35c64aa..cad21dfa05 100644 --- a/packages/common/types/Cargo.toml +++ b/packages/common/types/Cargo.toml @@ -9,8 +9,10 @@ license.workspace = true anyhow.workspace = true gas.workspace = true rivet-api-builder.workspace = true -rivet-runner-protocol.workspace = true rivet-data.workspace = true +rivet-runner-protocol.workspace = true rivet-util.workspace = true serde.workspace = true +udb-util.workspace = true utoipa.workspace = true +versioned-data-util.workspace = true diff --git a/packages/common/types/README.md b/packages/common/types/README.md new file mode 100644 index 0000000000..1f22f90271 --- /dev/null +++ b/packages/common/types/README.md @@ -0,0 +1,3 @@ +# Common + +This pkg exists to get around cargo cyclical deps. diff --git a/packages/common/types/src/keys/mod.rs b/packages/common/types/src/keys/mod.rs new file mode 100644 index 0000000000..38311a6f72 --- /dev/null +++ b/packages/common/types/src/keys/mod.rs @@ -0,0 +1 @@ +pub mod pegboard; diff --git a/packages/common/types/src/keys/pegboard/mod.rs b/packages/common/types/src/keys/pegboard/mod.rs new file mode 100644 index 0000000000..7e0a481030 --- /dev/null +++ b/packages/common/types/src/keys/pegboard/mod.rs @@ -0,0 +1 @@ +pub mod ns; diff --git a/packages/common/types/src/keys/pegboard/ns.rs b/packages/common/types/src/keys/pegboard/ns.rs new file mode 100644 index 0000000000..a768f14f61 --- /dev/null +++ b/packages/common/types/src/keys/pegboard/ns.rs @@ -0,0 +1,109 @@ +use std::result::Result::Ok; + +use anyhow::*; +use gas::prelude::*; +use udb_util::prelude::*; + +#[derive(Debug)] +pub struct OutboundDesiredSlotsKey { + pub namespace_id: Id, + pub runner_name_selector: String, +} + +impl OutboundDesiredSlotsKey { + pub fn new(namespace_id: Id, runner_name_selector: String) -> Self { + OutboundDesiredSlotsKey { + namespace_id, + runner_name_selector, + } + } + + pub fn subspace(namespace_id: Id) -> OutboundDesiredSlotsSubspaceKey { + OutboundDesiredSlotsSubspaceKey::new(namespace_id) + } + + pub fn entire_subspace() -> OutboundDesiredSlotsSubspaceKey { + OutboundDesiredSlotsSubspaceKey::entire() + } +} + +impl FormalKey for OutboundDesiredSlotsKey { + /// Count. + type Value = u32; + + fn deserialize(&self, raw: &[u8]) -> Result { + // NOTE: Atomic ops use little endian + Ok(u32::from_le_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + // NOTE: Atomic ops use little endian + Ok(value.to_le_bytes().to_vec()) + } +} + +impl TuplePack for OutboundDesiredSlotsKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + NAMESPACE, + OUTBOUND, + DESIRED_SLOTS, + self.namespace_id, + &self.runner_name_selector, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for OutboundDesiredSlotsKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, _, namespace_id, runner_name_selector)) = + <(usize, usize, usize, Id, String)>::unpack(input, tuple_depth)?; + + let v = OutboundDesiredSlotsKey { + namespace_id, + runner_name_selector, + }; + + Ok((input, v)) + } +} + +pub struct OutboundDesiredSlotsSubspaceKey { + namespace_id: Option, +} + +impl OutboundDesiredSlotsSubspaceKey { + pub fn new(namespace_id: Id) -> Self { + OutboundDesiredSlotsSubspaceKey { + namespace_id: Some(namespace_id), + } + } + + pub fn entire() -> Self { + OutboundDesiredSlotsSubspaceKey { namespace_id: None } + } +} + +impl TuplePack for OutboundDesiredSlotsSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (NAMESPACE, OUTBOUND, DESIRED_SLOTS); + offset += t.pack(w, tuple_depth)?; + + if let Some(namespace_id) = self.namespace_id { + offset += namespace_id.pack(w, tuple_depth)?; + } + + Ok(offset) + } +} diff --git a/packages/common/types/src/lib.rs b/packages/common/types/src/lib.rs index fc3902d998..19ca637eaf 100644 --- a/packages/common/types/src/lib.rs +++ b/packages/common/types/src/lib.rs @@ -1,3 +1,5 @@ pub mod actors; pub mod datacenters; +pub mod keys; +pub mod msgs; pub mod runners; diff --git a/packages/common/types/src/msgs/mod.rs b/packages/common/types/src/msgs/mod.rs new file mode 100644 index 0000000000..38311a6f72 --- /dev/null +++ b/packages/common/types/src/msgs/mod.rs @@ -0,0 +1 @@ +pub mod pegboard; diff --git a/packages/services/pegboard/src/messages.rs b/packages/common/types/src/msgs/pegboard.rs similarity index 100% rename from packages/services/pegboard/src/messages.rs rename to packages/common/types/src/msgs/pegboard.rs diff --git a/packages/common/udb-util/src/ext.rs b/packages/common/udb-util/src/ext.rs index 064e8a8fef..82ce1e159d 100644 --- a/packages/common/udb-util/src/ext.rs +++ b/packages/common/udb-util/src/ext.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, ops::Deref, result::Result::Ok}; +use std::{ops::Deref, result::Result::Ok}; use anyhow::*; use futures_util::TryStreamExt; @@ -44,6 +44,7 @@ impl<'a> TxnSubspace<'a> { ) -> Result { self.subspace .unpack(key) + .context("failed unpacking key") .map_err(|x| udb::FdbBindingError::CustomError(x.into())) } @@ -55,13 +56,19 @@ impl<'a> TxnSubspace<'a> { self.tx.set( &self.subspace.pack(key), &key.serialize(value) + .with_context(|| { + format!( + "failed serializing key value of {}", + std::any::type_name::() + ) + }) .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?, ); Ok(()) } - pub async fn read<'de, T: Debug + FormalKey + TuplePack + TupleUnpack<'de>>( + pub async fn read<'de, T: FormalKey + TuplePack + TupleUnpack<'de>>( &self, key: &'de T, snapshot: bool, @@ -99,6 +106,11 @@ impl<'a> TxnSubspace<'a> { self.tx.clear(&self.subspace.pack(key)); } + pub fn delete_key_subspace(&self, key: &T) { + self.tx + .clear_subspace_range(&self.subspace(&self.subspace.pack(key))); + } + pub fn read_entry TupleUnpack<'de>>( &self, entry: &udb::future::FdbValue, @@ -106,6 +118,12 @@ impl<'a> TxnSubspace<'a> { let key = self.unpack::(entry.key())?; let value = key .deserialize(entry.value()) + .with_context(|| { + format!( + "failed deserializing key value of {}", + std::any::type_name::() + ) + }) .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?; Ok((key, value)) @@ -131,7 +149,7 @@ impl<'a> TxnSubspace<'a> { .map_err(Into::into) } - pub fn atomic_op<'de, T: Debug + FormalKey + TuplePack + TupleUnpack<'de>>( + pub fn atomic_op<'de, T: FormalKey + TuplePack + TupleUnpack<'de>>( &self, key: &'de T, param: &[u8], @@ -157,7 +175,7 @@ pub trait SliceExt { } pub trait OptSliceExt { - fn read<'de, T: Debug + FormalKey + TupleUnpack<'de>>( + fn read<'de, T: FormalKey + TupleUnpack<'de>>( &self, key: &'de T, ) -> Result; @@ -173,18 +191,30 @@ impl SliceExt for udb::future::FdbSlice { key: &'de T, ) -> Result { key.deserialize(self) + .with_context(|| { + format!( + "failed deserializing key value of {}", + std::any::type_name::() + ) + }) .map_err(|x| udb::FdbBindingError::CustomError(x.into())) } } impl OptSliceExt for Option { - fn read<'de, T: Debug + FormalKey + TupleUnpack<'de>>( + fn read<'de, T: FormalKey + TupleUnpack<'de>>( &self, key: &'de T, ) -> Result { key.deserialize(&self.as_ref().ok_or(udb::FdbBindingError::CustomError( - format!("key should exist: {key:?}").into(), + format!("key should exist: {}", std::any::type_name::()).into(), ))?) + .with_context(|| { + format!( + "failed deserializing key value of {}", + std::any::type_name::() + ) + }) .map_err(|x| udb::FdbBindingError::CustomError(x.into())) } @@ -195,6 +225,12 @@ impl OptSliceExt for Option { if let Some(data) = self { key.deserialize(data) .map(Some) + .with_context(|| { + format!( + "failed deserializing key value of {}", + std::any::type_name::() + ) + }) .map_err(|x| udb::FdbBindingError::CustomError(x.into())) } else { Ok(None) diff --git a/packages/common/udb-util/src/keys.rs b/packages/common/udb-util/src/keys.rs index c0406fe67f..f873ca7066 100644 --- a/packages/common/udb-util/src/keys.rs +++ b/packages/common/udb-util/src/keys.rs @@ -121,4 +121,5 @@ define_keys! { (93, INSTANCE_BALLOT, "instance_ballot"), (94, OUTBOUND, "outbound"), (95, DESIRED_SLOTS, "desired_slots"), + (96, RUNNER_KIND, "runner_kind"), } diff --git a/packages/common/universaldb/src/atomic.rs b/packages/common/universaldb/src/atomic.rs index 3cbbfa5ec9..09e6eb52d6 100644 --- a/packages/common/universaldb/src/atomic.rs +++ b/packages/common/universaldb/src/atomic.rs @@ -36,7 +36,7 @@ fn apply_add(current: Option<&[u8]>, param: &[u8]) -> Vec { let param_int = bytes_to_i64_le(param); let result = current_int.wrapping_add(param_int); - i64_to_bytes_le(result, param.len().max(current.len()).max(8)) + i64_to_bytes_le(result, param.len().max(current.len())) } fn apply_bit_and(current: Option<&[u8]>, param: &[u8]) -> Vec { @@ -174,7 +174,7 @@ fn bytes_to_i64_le(bytes: &[u8]) -> i64 { } let mut padded = [0u8; 8]; - let len = bytes.len().min(8); + let len = bytes.len(); padded[..len].copy_from_slice(&bytes[..len]); i64::from_le_bytes(padded) @@ -182,10 +182,9 @@ fn bytes_to_i64_le(bytes: &[u8]) -> i64 { fn i64_to_bytes_le(value: i64, min_len: usize) -> Vec { let bytes = value.to_le_bytes(); - let len = min_len.max(8); - let mut result = vec![0u8; len]; - result[..8].copy_from_slice(&bytes); + let mut result = vec![0u8; min_len]; + result.copy_from_slice(&bytes[..min_len]); result } diff --git a/packages/core/actor-kv/src/entry.rs b/packages/core/actor-kv/src/entry.rs index d9a1f89792..143348be2f 100644 --- a/packages/core/actor-kv/src/entry.rs +++ b/packages/core/actor-kv/src/entry.rs @@ -99,6 +99,7 @@ impl<'de> TupleUnpack<'de> for EntryValueChunkKey { } } +#[derive(Debug)] pub struct EntryMetadataKey { pub key: KeyWrapper, } diff --git a/packages/core/actor-kv/src/key.rs b/packages/core/actor-kv/src/key.rs index 800aacfda7..a186caed6f 100644 --- a/packages/core/actor-kv/src/key.rs +++ b/packages/core/actor-kv/src/key.rs @@ -3,7 +3,7 @@ use universaldb::tuple::{ Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, }; -#[derive(Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct KeyWrapper(pub rp::KvKey); impl KeyWrapper { diff --git a/packages/core/api-peer/src/internal.rs b/packages/core/api-peer/src/internal.rs new file mode 100644 index 0000000000..47c84cd57b --- /dev/null +++ b/packages/core/api-peer/src/internal.rs @@ -0,0 +1,29 @@ +use anyhow::Result; +use gas::prelude::*; +use rivet_api_builder::ApiCtx; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct CachePurgeRequest { + pub base_key: String, + pub keys: Vec, +} + +#[derive(Serialize)] +pub struct CachePurgeResponse {} + +pub async fn cache_purge( + ctx: ApiCtx, + _path: (), + _query: (), + body: CachePurgeRequest, +) -> Result { + ctx.cache() + .clone() + .request() + .purge(&body.base_key, body.keys) + .await?; + + Ok(CachePurgeResponse {}) +} diff --git a/packages/core/api-peer/src/lib.rs b/packages/core/api-peer/src/lib.rs index c994ada92c..c951c31208 100644 --- a/packages/core/api-peer/src/lib.rs +++ b/packages/core/api-peer/src/lib.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use anyhow::*; pub mod actors; +pub mod internal; pub mod namespaces; pub mod router; pub mod runners; diff --git a/packages/core/api-peer/src/namespaces.rs b/packages/core/api-peer/src/namespaces.rs index 36b0a3c9fc..6fae8668ae 100644 --- a/packages/core/api-peer/src/namespaces.rs +++ b/packages/core/api-peer/src/namespaces.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use gas::prelude::*; use rivet_api_builder::ApiCtx; use rivet_api_types::pagination::Pagination; use rivet_util::Id; @@ -73,6 +74,7 @@ pub struct ListQuery { pub limit: Option, pub cursor: Option, pub name: Option, + #[serde(default)] pub namespace_id: Vec, } @@ -84,15 +86,6 @@ pub struct ListResponse { pub pagination: Pagination, } -#[utoipa::path( - get, - operation_id = "namespaces_list", - path = "/namespaces", - params(ListQuery), - responses( - (status = 200, body = ListResponse), - ), -)] pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result { // If name filter is provided, resolve and return only that namespace if let Some(name) = query.name { @@ -155,15 +148,6 @@ pub struct CreateResponse { pub namespace: namespace::types::Namespace, } -#[utoipa::path( - post, - operation_id = "namespaces_create", - path = "/namespaces", - request_body(content = CreateRequest, content_type = "application/json"), - responses( - (status = 200, body = CreateResponse), - ), -)] pub async fn create( ctx: ApiCtx, _path: (), @@ -211,3 +195,62 @@ pub async fn create( Ok(CreateResponse { namespace }) } + +#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct UpdateQuery {} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct UpdatePath { + pub namespace_id: Id, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = NamespacesUpdateRequest)] +pub struct UpdateRequest(namespace::workflows::namespace::Update); + +#[derive(Serialize, ToSchema)] +#[schema(as = NamespacesUpdateResponse)] +pub struct UpdateResponse {} + +pub async fn update( + ctx: ApiCtx, + path: UpdatePath, + _query: UpdateQuery, + body: UpdateRequest, +) -> Result { + let mut sub = ctx + .subscribe::(( + "namespace_id", + path.namespace_id, + )) + .await?; + + let res = ctx + .signal(body.0) + .to_workflow::() + .tag("namespace_id", path.namespace_id) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + return Err(namespace::errors::Namespace::NotFound.build()); + } else { + res?; + } + + sub.next() + .await? + .into_body() + .res + .map_err(|err| err.build())?; + + Ok(UpdateResponse {}) +} diff --git a/packages/core/api-peer/src/router.rs b/packages/core/api-peer/src/router.rs index bf279263f1..14c4ce1f83 100644 --- a/packages/core/api-peer/src/router.rs +++ b/packages/core/api-peer/src/router.rs @@ -1,6 +1,6 @@ use rivet_api_builder::{create_router, prelude::*}; -use crate::{actors, namespaces, runners}; +use crate::{actors, internal, namespaces, runners}; pub async fn router( name: &'static str, @@ -14,6 +14,7 @@ pub async fn router( .route("/namespaces", get(namespaces::list)) .route("/namespaces", post(namespaces::create)) .route("/namespaces/{namespace_id}", get(namespaces::get)) + .route("/namespaces/{namespace_id}", put(namespaces::update)) .route( "/namespaces/resolve/{name}", get(namespaces::resolve_for_name), @@ -28,6 +29,8 @@ pub async fn router( .route("/runners", get(runners::list)) .route("/runners/{runner_id}", get(runners::get)) .route("/runners/names", get(runners::list_names)) + // MARK: Internal + .route("/cache/purge", post(internal::cache_purge)) }) .await } diff --git a/packages/core/api-public/src/namespaces.rs b/packages/core/api-public/src/namespaces.rs index 61f3c20f2c..f1c4173124 100644 --- a/packages/core/api-public/src/namespaces.rs +++ b/packages/core/api-public/src/namespaces.rs @@ -137,3 +137,54 @@ async fn create_inner( .await } } + +#[utoipa::path( + put, + operation_id = "namespaces_update", + path = "/namespaces/{namespace_id}", + params( + ("namespace_id" = Id, Path), + UpdateQuery, + ), + request_body(content = UpdateRequest, content_type = "application/json"), + responses( + (status = 200, body = UpdateResponse), + ), +)] +pub async fn update( + Extension(ctx): Extension, + headers: HeaderMap, + Path(path): Path, + Query(query): Query, + Json(body): Json, +) -> Response { + match update_inner(ctx, headers, path, query, body).await { + Ok(response) => response, + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn update_inner( + ctx: ApiCtx, + headers: HeaderMap, + path: UpdatePath, + query: UpdateQuery, + body: UpdateRequest, +) -> Result { + if ctx.config().is_leader() { + let res = rivet_api_peer::namespaces::update(ctx, path, query, body).await?; + Ok(Json(res).into_response()) + } else { + let leader_dc = ctx.config().leader_dc()?; + request_remote_datacenter_raw( + &ctx, + leader_dc.datacenter_label, + &format!("/namespaces/{}", path.namespace_id), + axum::http::Method::PUT, + headers, + Some(&query), + Some(&body), + ) + .await + } +} diff --git a/packages/core/api-public/src/router.rs b/packages/core/api-public/src/router.rs index c0a0599bc5..a3075ffaa4 100644 --- a/packages/core/api-public/src/router.rs +++ b/packages/core/api-public/src/router.rs @@ -22,6 +22,7 @@ use crate::{actors, datacenters, namespaces, runners, ui}; runners::list_names, namespaces::list, namespaces::get, + namespaces::update, namespaces::create, datacenters::list, ))] @@ -46,6 +47,10 @@ pub async fn router( "/namespaces/{namespace_id}", axum::routing::get(namespaces::get), ) + .route( + "/namespaces/{namespace_id}", + axum::routing::put(namespaces::update), + ) // MARK: Actors .route("/actors", axum::routing::get(actors::list::list)) .route("/actors", post(actors::create::create)) diff --git a/packages/core/bootstrap/src/lib.rs b/packages/core/bootstrap/src/lib.rs index 3db5b704d4..304ccafbee 100644 --- a/packages/core/bootstrap/src/lib.rs +++ b/packages/core/bootstrap/src/lib.rs @@ -75,7 +75,8 @@ async fn create_default_namespace(ctx: &StandaloneCtx) -> Result<()> { .op(namespace::ops::resolve_for_name_local::Input { name: "default".to_string(), }) - .await?; + .await + .context("failed resolving default name")?; if existing_namespace.is_none() { // Create namespace diff --git a/packages/core/pegboard-outbound/Cargo.toml b/packages/core/pegboard-outbound/Cargo.toml index 13d4b5c334..039d612d4c 100644 --- a/packages/core/pegboard-outbound/Cargo.toml +++ b/packages/core/pegboard-outbound/Cargo.toml @@ -12,6 +12,7 @@ gas.workspace = true reqwest-eventsource.workspace = true rivet-config.workspace = true rivet-runner-protocol.workspace = true +rivet-types.workspace = true tracing.workspace = true udb-util.workspace = true universaldb.workspace = true diff --git a/packages/core/pegboard-outbound/src/lib.rs b/packages/core/pegboard-outbound/src/lib.rs index 8cb0e1044f..aea4ebd81d 100644 --- a/packages/core/pegboard-outbound/src/lib.rs +++ b/packages/core/pegboard-outbound/src/lib.rs @@ -17,8 +17,6 @@ use tokio::{sync::oneshot, task::JoinHandle, time::Duration}; use udb_util::{SNAPSHOT, TxnExt}; use universaldb::{self as udb, options::StreamingMode}; -const OUTBOUND_REQUEST_LIFESPAN: Duration = Duration::from_secs(14 * 60 + 30); - struct OutboundConnection { handle: JoinHandle<()>, shutdown_tx: oneshot::Sender<()>, @@ -39,7 +37,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R )?; let mut sub = ctx - .subscribe::(()) + .subscribe::(()) .await?; let mut outbound_connections = HashMap::new(); @@ -54,36 +52,35 @@ async fn tick( ctx: &StandaloneCtx, outbound_connections: &mut HashMap<(Id, String), Vec>, ) -> Result<()> { - let outbound_data = ctx - .udb()? - .run(|tx, _mc| async move { - let txs = tx.subspace(keys::subspace()); - let outbound_desired_subspace = - txs.subspace(&keys::ns::OutboundDesiredSlotsKey::subspace()); - - txs.get_ranges_keyvalues( - udb::RangeOption { - mode: StreamingMode::WantAll, - ..(&outbound_desired_subspace).into() - }, - // NOTE: This is a snapshot to prevent conflict with updates to this subspace - SNAPSHOT, - ) - .map(|res| match res { - Ok(entry) => { - let (key, desired_slots) = - txs.read_entry::(&entry)?; - - Ok((key.namespace_id, key.runner_name_selector, desired_slots)) - } - Err(err) => Err(err.into()), + let outbound_data = + ctx.udb()? + .run(|tx, _mc| async move { + let txs = tx.subspace(keys::subspace()); + let outbound_desired_subspace = txs.subspace( + &rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::entire_subspace(), + ); + + txs.get_ranges_keyvalues( + udb::RangeOption { + mode: StreamingMode::WantAll, + ..(&outbound_desired_subspace).into() + }, + // NOTE: This is a snapshot to prevent conflict with updates to this subspace + SNAPSHOT, + ) + .map(|res| match res { + Ok(entry) => { + let (key, desired_slots) = + txs.read_entry::(&entry)?; + + Ok((key.namespace_id, key.runner_name_selector, desired_slots)) + } + Err(err) => Err(err.into()), + }) + .try_collect::>() + .await }) - .try_collect::>() - .await - - // outbound/{ns_id}/{runner_name_selector}/desired_slots - }) - .await?; + .await?; let mut namespace_ids = outbound_data .iter() @@ -103,6 +100,7 @@ async fn tick( let RunnerKind::Outbound { url, + request_lifespan, slots_per_runner, min_runners, max_runners, @@ -123,11 +121,9 @@ async fn tick( // Remove finished and draining connections from list curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); - let desired_count = (desired_slots - .div_ceil(*slots_per_runner) - .max(*min_runners) - .min(*max_runners) + let desired_count = (desired_slots.div_ceil(*slots_per_runner).max(*min_runners) + runners_margin) + .min(*max_runners) .try_into()?; // Calculate diff @@ -147,8 +143,14 @@ async fn tick( } } - let starting_connections = - std::iter::repeat_with(|| spawn_connection(ctx.clone(), url.clone())).take(start_count); + let starting_connections = std::iter::repeat_with(|| { + spawn_connection( + ctx.clone(), + url.clone(), + Duration::from_secs(*request_lifespan as u64), + ) + }) + .take(start_count); curr.extend(starting_connections); } @@ -164,13 +166,19 @@ async fn tick( Ok(()) } -fn spawn_connection(ctx: StandaloneCtx, url: String) -> OutboundConnection { +fn spawn_connection( + ctx: StandaloneCtx, + url: String, + request_lifespan: Duration, +) -> OutboundConnection { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let draining = Arc::new(AtomicBool::new(false)); let draining2 = draining.clone(); let handle = tokio::spawn(async move { - if let Err(err) = outbound_handler(&ctx, url, shutdown_rx, draining2).await { + if let Err(err) = + outbound_handler(&ctx, url, request_lifespan, shutdown_rx, draining2).await + { tracing::error!(?err, "outbound req failed"); // TODO: Add backoff @@ -178,7 +186,7 @@ fn spawn_connection(ctx: StandaloneCtx, url: String) -> OutboundConnection { // On error, bump the autoscaler loop again let _ = ctx - .msg(pegboard::messages::BumpOutboundAutoscaler {}) + .msg(rivet_types::msgs::pegboard::BumpOutboundAutoscaler {}) .send() .await; } @@ -194,6 +202,7 @@ fn spawn_connection(ctx: StandaloneCtx, url: String) -> OutboundConnection { async fn outbound_handler( ctx: &StandaloneCtx, url: String, + request_lifespan: Duration, shutdown_rx: oneshot::Receiver<()>, draining: Arc, ) -> Result<()> { @@ -226,13 +235,13 @@ async fn outbound_handler( tokio::select! { res = stream_handler => return res.map_err(Into::into), - _ = tokio::time::sleep(OUTBOUND_REQUEST_LIFESPAN) => {} + _ = tokio::time::sleep(request_lifespan) => {} _ = shutdown_rx => {} } draining.store(true, Ordering::SeqCst); - ctx.msg(pegboard::messages::BumpOutboundAutoscaler {}) + ctx.msg(rivet_types::msgs::pegboard::BumpOutboundAutoscaler {}) .send() .await?; diff --git a/packages/services/internal/Cargo.toml b/packages/services/internal/Cargo.toml new file mode 100644 index 0000000000..e9f9ec8a88 --- /dev/null +++ b/packages/services/internal/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "internal" +version.workspace = true +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +anyhow.workspace = true +gas.workspace = true +rivet-api-client.workspace = true +serde.workspace = true diff --git a/packages/services/internal/README.md b/packages/services/internal/README.md new file mode 100644 index 0000000000..0c60920680 --- /dev/null +++ b/packages/services/internal/README.md @@ -0,0 +1 @@ +TODO: move somewhere else diff --git a/packages/services/internal/src/lib.rs b/packages/services/internal/src/lib.rs new file mode 100644 index 0000000000..01eafd2ecc --- /dev/null +++ b/packages/services/internal/src/lib.rs @@ -0,0 +1 @@ +pub mod ops; diff --git a/packages/services/internal/src/ops/cache/mod.rs b/packages/services/internal/src/ops/cache/mod.rs new file mode 100644 index 0000000000..65ba4904bd --- /dev/null +++ b/packages/services/internal/src/ops/cache/mod.rs @@ -0,0 +1 @@ +pub mod purge_global; diff --git a/packages/services/internal/src/ops/cache/purge_global.rs b/packages/services/internal/src/ops/cache/purge_global.rs new file mode 100644 index 0000000000..b86edac77a --- /dev/null +++ b/packages/services/internal/src/ops/cache/purge_global.rs @@ -0,0 +1,78 @@ +use std::fmt::Debug; + +use futures_util::StreamExt; +use gas::prelude::*; +use rivet_api_client::{HeaderMap, Method, request_remote_datacenter}; +use rivet_cache::RawCacheKey; +use serde::Serialize; + +#[derive(Clone, Debug, Default)] +pub struct Input { + pub base_key: String, + pub keys: Vec, +} + +#[operation] +pub async fn cache_purge_global(ctx: &OperationCtx, input: &Input) -> Result<()> { + let dcs = &ctx.config().topology().datacenters; + + let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { + let ctx = ctx.clone(); + let input = input.clone(); + + async move { + if dc.datacenter_label == ctx.config().dc_label() { + // Local datacenter + ctx.cache() + .clone() + .request() + .purge(input.base_key, input.keys) + .await + } else { + // Remote datacenter - HTTP request + request_remote_datacenter( + ctx.config(), + dc.datacenter_label, + "/cache/purge", + Method::POST, + HeaderMap::new(), + Option::<&()>::None, + Some(&CachePurgeRequest { + base_key: input.base_key, + keys: input.keys, + }), + ) + .await + } + } + })) + .buffer_unordered(16) + .collect::>() + .await; + + // Aggregate results + let result_count = results.len(); + let mut errors = Vec::new(); + for res in results { + if let Err(err) = res { + tracing::error!(?err, "failed to request edge dc"); + errors.push(err); + } + } + + // Error only if all requests failed + if result_count == errors.len() { + if let Some(res) = errors.into_iter().next() { + return Err(res).context("all datacenter requests failed"); + } + } + + Ok(()) +} + +// TODO: This is cloned from api-peer because of a cyclical dependency +#[derive(Serialize)] +pub struct CachePurgeRequest { + pub base_key: String, + pub keys: Vec, +} diff --git a/packages/services/internal/src/ops/mod.rs b/packages/services/internal/src/ops/mod.rs new file mode 100644 index 0000000000..a5c08fdb0d --- /dev/null +++ b/packages/services/internal/src/ops/mod.rs @@ -0,0 +1 @@ +pub mod cache; diff --git a/packages/services/namespace/Cargo.toml b/packages/services/namespace/Cargo.toml index 32d46a6735..823fb28bf2 100644 --- a/packages/services/namespace/Cargo.toml +++ b/packages/services/namespace/Cargo.toml @@ -8,14 +8,17 @@ edition.workspace = true [dependencies] anyhow.workspace = true gas.workspace = true +internal.workspace = true rivet-api-builder.workspace = true rivet-api-util.workspace = true rivet-data.workspace = true rivet-error.workspace = true +rivet-types.workspace = true rivet-util.workspace = true serde.workspace = true tracing.workspace = true udb-util.workspace = true universaldb.workspace = true +url.workspace = true utoipa.workspace = true -versioned-data-util.workspace = true +versioned-data-util.workspace = true \ No newline at end of file diff --git a/packages/services/namespace/src/errors.rs b/packages/services/namespace/src/errors.rs index 2bf4acfcf3..a76a0419ac 100644 --- a/packages/services/namespace/src/errors.rs +++ b/packages/services/namespace/src/errors.rs @@ -19,4 +19,11 @@ pub enum Namespace { #[error("not_leader", "Attempting to run operation in non-leader datacenter.")] NotLeader, + + #[error( + "invalid_update", + "Failed to update namespace.", + "Failed to update namespace: {reason}" + )] + InvalidUpdate { reason: String }, } diff --git a/packages/services/namespace/src/keys.rs b/packages/services/namespace/src/keys.rs index c4e13afde4..0ed14ec8a2 100644 --- a/packages/services/namespace/src/keys.rs +++ b/packages/services/namespace/src/keys.rs @@ -180,7 +180,7 @@ impl TuplePack for RunnerKindKey { w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { - let t = (DATA, self.namespace_id, CREATE_TS); + let t = (DATA, self.namespace_id, RUNNER_KIND); t.pack(w, tuple_depth) } } diff --git a/packages/services/namespace/src/ops/resolve_for_name_local.rs b/packages/services/namespace/src/ops/resolve_for_name_local.rs index baac221ddc..d70e382000 100644 --- a/packages/services/namespace/src/ops/resolve_for_name_local.rs +++ b/packages/services/namespace/src/ops/resolve_for_name_local.rs @@ -1,6 +1,5 @@ use gas::prelude::*; -use udb_util::{FormalKey, SERIALIZABLE}; -use universaldb as udb; +use udb_util::{SERIALIZABLE, TxnExt}; use crate::{errors, keys, ops::get_local::get_inner, types::Namespace}; @@ -22,21 +21,16 @@ pub async fn namespace_resolve_for_name_local( .run(|tx, _mc| { let name = input.name.clone(); async move { - let name_idx_key = keys::ByNameKey::new(name.clone()); + let txs = tx.subspace(keys::subspace()); - let name_idx_entry = tx - .get(&keys::subspace().pack(&name_idx_key), SERIALIZABLE) - .await?; - - // Namespace not found - let Some(name_idx_entry) = name_idx_entry else { + let Some(namespace_id) = txs + .read_opt(&keys::ByNameKey::new(name.clone()), SERIALIZABLE) + .await? + else { + // Namespace not found return Ok(None); }; - let namespace_id = name_idx_key - .deserialize(&name_idx_entry) - .map_err(|x| udb::FdbBindingError::CustomError(x.into()))?; - get_inner(namespace_id, &tx).await } }) diff --git a/packages/services/namespace/src/types.rs b/packages/services/namespace/src/types.rs index 05e924ad34..a9709e4efb 100644 --- a/packages/services/namespace/src/types.rs +++ b/packages/services/namespace/src/types.rs @@ -12,9 +12,12 @@ pub struct Namespace { #[derive(Debug, Clone, Serialize, Deserialize, Hash, ToSchema)] #[serde(rename_all = "snake_case")] +#[schema(as = NamespacesRunnerKind)] pub enum RunnerKind { Outbound { url: String, + /// Seconds. + request_lifespan: u32, slots_per_runner: u32, min_runners: u32, max_runners: u32, @@ -28,6 +31,7 @@ impl From for rivet_data::generated::namespace_runner_kind_v1::Data match value { RunnerKind::Outbound { url, + request_lifespan, slots_per_runner, min_runners, max_runners, @@ -35,6 +39,7 @@ impl From for rivet_data::generated::namespace_runner_kind_v1::Data } => rivet_data::generated::namespace_runner_kind_v1::Data::Outbound( rivet_data::generated::namespace_runner_kind_v1::Outbound { url, + request_lifespan, slots_per_runner, min_runners, max_runners, @@ -52,6 +57,7 @@ impl From for RunnerKind rivet_data::generated::namespace_runner_kind_v1::Data::Outbound(o) => { RunnerKind::Outbound { url: o.url, + request_lifespan: o.request_lifespan, slots_per_runner: o.slots_per_runner, min_runners: o.min_runners, max_runners: o.max_runners, diff --git a/packages/services/namespace/src/workflows/namespace.rs b/packages/services/namespace/src/workflows/namespace.rs index 90078b23e6..af5d5b4855 100644 --- a/packages/services/namespace/src/workflows/namespace.rs +++ b/packages/services/namespace/src/workflows/namespace.rs @@ -1,7 +1,9 @@ use futures_util::FutureExt; use gas::prelude::*; +use rivet_cache::CacheKey; use serde::{Deserialize, Serialize}; use udb_util::{SERIALIZABLE, TxnExt}; +use utoipa::ToSchema; use crate::{errors, keys, types::RunnerKind}; @@ -57,8 +59,34 @@ pub async fn namespace(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { // Does nothing yet ctx.repeat(|ctx| { + let namespace_id = input.namespace_id; + async move { - ctx.listen::().await?; + let update = ctx.listen::().await?; + + let res = ctx + .activity(UpdateInput { + namespace_id, + update, + }) + .await?; + + if let Ok(update_res) = &res { + ctx.activity(PurgeCacheInput { namespace_id }).await?; + + if update_res.bump_autoscaler { + ctx.msg(rivet_types::msgs::pegboard::BumpOutboundAutoscaler {}) + .send() + .await?; + } + } + + ctx.msg(UpdateResult { + res: res.map(|_| ()), + }) + .tag("namespace_id", namespace_id) + .send() + .await?; Ok(Loop::<()>::Continue) } @@ -78,7 +106,17 @@ pub struct Failed { } #[signal("namespace_update")] -pub struct Update {} +#[derive(Debug, Clone, Hash, ToSchema)] +#[schema(as = NamespacesUpdate)] +#[serde(rename_all = "snake_case")] +pub enum Update { + UpdateRunnerKind { runner_kind: RunnerKind }, +} + +#[message("namespace_update_result")] +pub struct UpdateResult { + pub res: Result<(), errors::Namespace>, +} #[derive(Debug, Clone, Serialize, Deserialize, Hash)] pub struct ValidateInput { @@ -147,8 +185,7 @@ async fn insert_fdb( ctx: &ActivityCtx, input: &InsertFdbInput, ) -> Result> { - let res = ctx - .udb()? + ctx.udb()? .run(|tx, _mc| { let namespace_id = input.namespace_id; let name = input.name.clone(); @@ -168,14 +205,6 @@ async fn insert_fdb( txs.write(&keys::CreateTsKey::new(namespace_id), input.create_ts)?; txs.write(&keys::RunnerKindKey::new(namespace_id), RunnerKind::Custom)?; - // RunnerKind::Outbound { - // url: "http://runner:5051/start".to_string(), - // slots_per_runner: 10, - // min_runners: 1, - // max_runners: 1, - // runners_margin: 0, - // } - // Insert idx txs.write(&name_idx_key, namespace_id)?; @@ -183,7 +212,98 @@ async fn insert_fdb( } }) .custom_instrument(tracing::info_span!("namespace_create_tx")) - .await?; + .await + .map_err(Into::into) +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +struct UpdateInput { + namespace_id: Id, + update: Update, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +struct UpdateOutput { + bump_autoscaler: bool, +} + +#[activity(UpdateActivity)] +async fn update( + ctx: &ActivityCtx, + input: &UpdateInput, +) -> Result> { + ctx + .udb()? + .run(|tx, _mc| { + let namespace_id = input.namespace_id; + let update = input.update.clone(); + + async move { + let txs = tx.subspace(keys::subspace()); + + let bump_autoscaler = match update { + Update::UpdateRunnerKind { runner_kind } => { + let bump_autoscaler = match &runner_kind { + RunnerKind::Outbound { + url, + slots_per_runner, + .. + } => { + // Validate url + if let Err(err) = url::Url::parse(url) { + return Ok(Err(errors::Namespace::InvalidUpdate { + reason: format!("invalid outbound url: {err}"), + })); + } + + // Validate slots per runner + if *slots_per_runner == 0 { + return Ok(Err(errors::Namespace::InvalidUpdate { + reason: "`slots_per_runner` cannot be 0".to_string(), + })); + } + + true + } + RunnerKind::Custom => { + // Clear outbound data + txs.delete_key_subspace(&rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::subspace(namespace_id)); + + false + } + }; + + txs.write(&keys::RunnerKindKey::new(namespace_id), runner_kind)?; + + bump_autoscaler + } + }; + + Ok(Ok(UpdateOutput { bump_autoscaler })) + } + }) + .custom_instrument(tracing::info_span!("namespace_create_tx")) + .await + .map_err(Into::into) +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +struct PurgeCacheInput { + namespace_id: Id, +} - Ok(res) +#[activity(PurgeCache)] +async fn purge_cache(ctx: &ActivityCtx, input: &PurgeCacheInput) -> Result<()> { + let res = ctx + .op(internal::ops::cache::purge_global::Input { + base_key: "namespace.get_global".to_string(), + keys: vec![input.namespace_id.cache_key().into()], + }) + .await; + + if let Err(err) = res { + tracing::error!(?err, "failed to purge global namespace cache"); + } + + Ok(()) } diff --git a/packages/services/pegboard/Cargo.toml b/packages/services/pegboard/Cargo.toml index 945f36bdfa..f9569fa571 100644 --- a/packages/services/pegboard/Cargo.toml +++ b/packages/services/pegboard/Cargo.toml @@ -7,16 +7,16 @@ edition.workspace = true [dependencies] anyhow.workspace = true -gas.workspace = true epoxy.workspace = true +gas.workspace = true lazy_static.workspace = true namespace.workspace = true nix.workspace = true rivet-api-client.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true -rivet-error.workspace = true rivet-data.workspace = true +rivet-error.workspace = true rivet-metrics.workspace = true rivet-runner-protocol.workspace = true rivet-types.workspace = true diff --git a/packages/services/pegboard/src/keys/ns.rs b/packages/services/pegboard/src/keys/ns.rs index 236c61bc9c..5ccf65fcb5 100644 --- a/packages/services/pegboard/src/keys/ns.rs +++ b/packages/services/pegboard/src/keys/ns.rs @@ -1365,87 +1365,3 @@ impl TuplePack for RunnerNameSubspaceKey { t.pack(w, tuple_depth) } } - -#[derive(Debug)] -pub struct OutboundDesiredSlotsKey { - pub namespace_id: Id, - pub runner_name_selector: String, -} - -impl OutboundDesiredSlotsKey { - pub fn new(namespace_id: Id, runner_name_selector: String) -> Self { - OutboundDesiredSlotsKey { - namespace_id, - runner_name_selector, - } - } - - pub fn subspace() -> OutboundDesiredSlotsSubspaceKey { - OutboundDesiredSlotsSubspaceKey::new() - } -} - -impl FormalKey for OutboundDesiredSlotsKey { - /// Count. - type Value = u32; - - fn deserialize(&self, raw: &[u8]) -> Result { - // NOTE: Atomic ops use little endian - Ok(u32::from_le_bytes(raw.try_into()?)) - } - - fn serialize(&self, value: Self::Value) -> Result> { - // NOTE: Atomic ops use little endian - Ok(value.to_le_bytes().to_vec()) - } -} - -impl TuplePack for OutboundDesiredSlotsKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = ( - NAMESPACE, - OUTBOUND, - DESIRED_SLOTS, - self.namespace_id, - &self.runner_name_selector, - ); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for OutboundDesiredSlotsKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, _, namespace_id, runner_name_selector)) = - <(usize, usize, Id, String)>::unpack(input, tuple_depth)?; - - let v = OutboundDesiredSlotsKey { - namespace_id, - runner_name_selector, - }; - - Ok((input, v)) - } -} - -pub struct OutboundDesiredSlotsSubspaceKey {} - -impl OutboundDesiredSlotsSubspaceKey { - pub fn new() -> Self { - OutboundDesiredSlotsSubspaceKey {} - } -} - -impl TuplePack for OutboundDesiredSlotsSubspaceKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = (NAMESPACE, OUTBOUND, DESIRED_SLOTS); - t.pack(w, tuple_depth) - } -} diff --git a/packages/services/pegboard/src/lib.rs b/packages/services/pegboard/src/lib.rs index b5dd33dd0a..8a08a5b9a9 100644 --- a/packages/services/pegboard/src/lib.rs +++ b/packages/services/pegboard/src/lib.rs @@ -2,7 +2,6 @@ use gas::prelude::*; pub mod errors; pub mod keys; -pub mod messages; mod metrics; pub mod ops; pub mod pubsub_subjects; diff --git a/packages/services/pegboard/src/workflows/actor/destroy.rs b/packages/services/pegboard/src/workflows/actor/destroy.rs index 44862d219d..3faf1f171c 100644 --- a/packages/services/pegboard/src/workflows/actor/destroy.rs +++ b/packages/services/pegboard/src/workflows/actor/destroy.rs @@ -237,7 +237,10 @@ pub(crate) async fn clear_slot( if let RunnerKind::Outbound { .. } = ns_runner_kind { txs.atomic_op( - &keys::ns::OutboundDesiredSlotsKey::new(namespace_id, runner_name_selector.to_string()), + &rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::new( + namespace_id, + runner_name_selector.to_string(), + ), &(-1i32).to_le_bytes(), MutationType::Add, ); diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index b1cb7887e8..3371834f83 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -116,7 +116,7 @@ async fn allocate_actor( // Increment desired slots if namespace has an outbound runner kind if let RunnerKind::Outbound { .. } = ns_runner_kind { txs.atomic_op( - &keys::ns::OutboundDesiredSlotsKey::new( + &rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::new( namespace_id, input.runner_name_selector.clone(), ), @@ -347,7 +347,7 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() .await?; } else if let RunnerKind::Outbound { .. } = ns_runner_kind { txs.atomic_op( - &keys::ns::OutboundDesiredSlotsKey::new( + &rivet_types::keys::pegboard::ns::OutboundDesiredSlotsKey::new( namespace_id, runner_name_selector.clone(), ), @@ -391,7 +391,7 @@ pub async fn spawn_actor( "failed to allocate (no availability), waiting for allocation", ); - ctx.msg(crate::messages::BumpOutboundAutoscaler {}) + ctx.msg(rivet_types::msgs::pegboard::BumpOutboundAutoscaler {}) .send() .await?; diff --git a/sdks/schemas/data/namespace.runner_kind.v1.bare b/sdks/schemas/data/namespace.runner_kind.v1.bare index 1cc263b54e..05c8eaa739 100644 --- a/sdks/schemas/data/namespace.runner_kind.v1.bare +++ b/sdks/schemas/data/namespace.runner_kind.v1.bare @@ -1,5 +1,6 @@ type Outbound struct { url: str + request_lifespan: u32 slots_per_runner: u32 min_runners: u32 max_runners: u32