diff --git a/Cargo.lock b/Cargo.lock index d5cc7c6c3a..2c96ef3bab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2382,7 +2382,7 @@ version = "25.6.1" dependencies = [ "anyhow", "gasoline", - "rivet-api-client", + "rivet-api-util", "rivet-types", "serde", ] @@ -3264,7 +3264,6 @@ dependencies = [ "lazy_static", "namespace", "nix 0.30.1", - "rivet-api-client", "rivet-api-types", "rivet-api-util", "rivet-data", @@ -4107,26 +4106,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "rivet-api-client" -version = "25.6.1" -dependencies = [ - "anyhow", - "axum 0.8.4", - "futures-util", - "reqwest", - "rivet-api-builder", - "rivet-api-util", - "rivet-config", - "rivet-error", - "rivet-pools", - "rivet-util", - "serde", - "serde_html_form", - "tokio", - "tracing", -] - [[package]] name = "rivet-api-full" version = "0.0.1" @@ -4182,7 +4161,6 @@ dependencies = [ "pegboard", "reqwest", "rivet-api-builder", - "rivet-api-client", "rivet-api-peer", "rivet-api-types", "rivet-api-util", @@ -4219,10 +4197,18 @@ version = "25.6.1" dependencies = [ "anyhow", "axum 0.8.4", + "futures-util", "reqwest", "rivet-api-builder", + "rivet-config", + "rivet-error", + "rivet-pools", + "rivet-util", "serde", + "serde_html_form", "serde_json", + "tokio", + "tracing", ] [[package]] @@ -4426,7 +4412,6 @@ dependencies = [ "pegboard-gateway", "pegboard-tunnel", "regex", - "rivet-api-peer", "rivet-api-public", "rivet-cache", "rivet-config", diff --git a/Cargo.toml b/Cargo.toml index 7ba87917d8..8d99495fa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-runner-ws","packages/core/pegboard-serverless","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/internal","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"] +members = ["packages/common/api-builder","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/telemetry","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-runner-ws","packages/core/pegboard-serverless","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/internal","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/bare_gen","sdks/rust/data","sdks/rust/epoxy-protocol","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"] [workspace.package] version = "25.6.1" @@ -249,9 +249,6 @@ features = ["ansi","fmt","json","env-filter"] [workspace.dependencies.rivet-api-builder] path = "packages/common/api-builder" -[workspace.dependencies.rivet-api-client] -path = "packages/common/api-client" - [workspace.dependencies.rivet-api-types] path = "packages/common/api-types" diff --git a/docker/dev-host/docker-compose.yml b/docker/dev-host/docker-compose.yml index 75e32cac41..7331a6f3d6 100644 --- a/docker/dev-host/docker-compose.yml +++ b/docker/dev-host/docker-compose.yml @@ -164,7 +164,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://127.0.0.1:6420 - - RUNNER_HOST=127.0.0.1 stop_grace_period: 4s depends_on: rivet-engine: diff --git a/docker/dev-multidc-multinode/docker-compose.yml b/docker/dev-multidc-multinode/docker-compose.yml index 8893d32457..d543263a7d 100644 --- a/docker/dev-multidc-multinode/docker-compose.yml +++ b/docker/dev-multidc-multinode/docker-compose.yml @@ -277,7 +277,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-a-0:6420 - - RUNNER_HOST=runner-dc-a-0 stop_grace_period: 4s ports: - '5050:5050' @@ -294,7 +293,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-a-0:6420 - - RUNNER_HOST=runner-dc-a-1 stop_grace_period: 4s depends_on: rivet-engine-dc-a-0: @@ -309,7 +307,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-a-0:6420 - - RUNNER_HOST=runner-dc-a-2 stop_grace_period: 4s depends_on: rivet-engine-dc-a-0: @@ -537,7 +534,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-b-0:6420 - - RUNNER_HOST=runner-dc-b-0 stop_grace_period: 4s depends_on: rivet-engine-dc-b-0: @@ -552,7 +548,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-b-0:6420 - - RUNNER_HOST=runner-dc-b-1 stop_grace_period: 4s depends_on: rivet-engine-dc-b-0: @@ -567,7 +562,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-b-0:6420 - - RUNNER_HOST=runner-dc-b-2 stop_grace_period: 4s depends_on: rivet-engine-dc-b-0: @@ -795,7 +789,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-c-0:6420 - - RUNNER_HOST=runner-dc-c-0 stop_grace_period: 4s depends_on: rivet-engine-dc-c-0: @@ -810,7 +803,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-c-0:6420 - - RUNNER_HOST=runner-dc-c-1 stop_grace_period: 4s depends_on: rivet-engine-dc-c-0: @@ -825,7 +817,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-c-0:6420 - - RUNNER_HOST=runner-dc-c-2 stop_grace_period: 4s depends_on: rivet-engine-dc-c-0: diff --git a/docker/dev-multidc/docker-compose.yml b/docker/dev-multidc/docker-compose.yml index ba3d226472..9d49b3d228 100644 --- a/docker/dev-multidc/docker-compose.yml +++ b/docker/dev-multidc/docker-compose.yml @@ -195,7 +195,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-a:6420 - - RUNNER_HOST=runner-dc-a stop_grace_period: 4s ports: - '5050:5050' @@ -343,7 +342,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-b:6420 - - RUNNER_HOST=runner-dc-b stop_grace_period: 4s depends_on: rivet-engine-dc-b: @@ -489,7 +487,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-dc-c:6420 - - RUNNER_HOST=runner-dc-c stop_grace_period: 4s depends_on: rivet-engine-dc-c: diff --git a/docker/dev-multinode/docker-compose.yml b/docker/dev-multinode/docker-compose.yml index faedb297cf..4e7fba5dab 100644 --- a/docker/dev-multinode/docker-compose.yml +++ b/docker/dev-multinode/docker-compose.yml @@ -260,7 +260,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-0:6420 - - RUNNER_HOST=runner-0 stop_grace_period: 4s ports: - '5050:5050' @@ -277,7 +276,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-0:6420 - - RUNNER_HOST=runner-1 stop_grace_period: 4s depends_on: rivet-engine-0: @@ -292,7 +290,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine-0:6420 - - RUNNER_HOST=runner-2 stop_grace_period: 4s depends_on: rivet-engine-0: diff --git a/docker/dev/docker-compose.yml b/docker/dev/docker-compose.yml index d1b9fd8cfb..df2f69a993 100644 --- a/docker/dev/docker-compose.yml +++ b/docker/dev/docker-compose.yml @@ -186,8 +186,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine:6420 - - RUNNER_HOST=runner - - NO_AUTOSTART=1 stop_grace_period: 4s ports: - '5050:5050' diff --git a/docker/prod-file-system/docker-compose.yml b/docker/prod-file-system/docker-compose.yml index 8bf0241c51..567db01d71 100644 --- a/docker/prod-file-system/docker-compose.yml +++ b/docker/prod-file-system/docker-compose.yml @@ -31,7 +31,6 @@ services: restart: unless-stopped environment: - RIVET_ENDPOINT=http://rivet-engine:6420 - - RUNNER_HOST=runner stop_grace_period: 4s ports: - '5050:5050' diff --git a/out/openapi.json b/out/openapi.json index 71b8be245b..536842850a 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -550,19 +550,19 @@ } } }, - "/namespaces/{namespace_id}/runner-configs": { + "/runner-configs": { "get": { "tags": [ - "namespaces::runner_configs" + "runner_configs" ], - "operationId": "namespaces_runner_configs_list", + "operationId": "runner_configs_list", "parameters": [ { - "name": "namespace_id", - "in": "path", + "name": "namespace", + "in": "query", "required": true, "schema": { - "$ref": "#/components/schemas/RivetId" + "type": "string" } }, { @@ -587,45 +587,18 @@ "in": "query", "required": false, "schema": { - "$ref": "#/components/schemas/NamespacesRunnerConfigVariant" - } - } - ], - "responses": { - "200": { - "description": "", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/NamespacesRunnerConfigsListResponse" - } - } - } - } - } - } - }, - "/namespaces/{namespace_id}/runner-configs/{runner_name}": { - "get": { - "tags": [ - "namespaces::runner_configs" - ], - "operationId": "namespaces_runner_configs_get", - "parameters": [ - { - "name": "namespace_id", - "in": "path", - "required": true, - "schema": { - "$ref": "#/components/schemas/RivetId" + "$ref": "#/components/schemas/RunnerConfigVariant" } }, { "name": "runner_name", - "in": "path", - "required": true, + "in": "query", + "required": false, "schema": { - "type": "string" + "type": "array", + "items": { + "type": "string" + } } } ], @@ -635,30 +608,32 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/NamespacesRunnerConfigsGetResponse" + "$ref": "#/components/schemas/RunnerConfigsListResponse" } } } } } - }, + } + }, + "/runner-configs/{runner_name}": { "put": { "tags": [ - "namespaces::runner_configs" + "runner_configs" ], - "operationId": "namespaces_runner_configs_upsert", + "operationId": "runner_configs_upsert", "parameters": [ { - "name": "namespace_id", + "name": "runner_name", "in": "path", "required": true, "schema": { - "$ref": "#/components/schemas/RivetId" + "type": "string" } }, { - "name": "runner_name", - "in": "path", + "name": "namespace", + "in": "query", "required": true, "schema": { "type": "string" @@ -669,7 +644,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/NamespacesRunnerConfigsUpsertRequest" + "$ref": "#/components/schemas/RunnerConfigsUpsertRequest" } } }, @@ -681,7 +656,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/NamespacesRunnerConfigsUpsertResponse" + "$ref": "#/components/schemas/RunnerConfigsUpsertResponse" } } } @@ -690,21 +665,21 @@ }, "delete": { "tags": [ - "namespaces::runner_configs" + "runner_configs" ], - "operationId": "namespaces_runner_configs_delete", + "operationId": "runner_configs_delete", "parameters": [ { - "name": "namespace_id", + "name": "runner_name", "in": "path", "required": true, "schema": { - "$ref": "#/components/schemas/RivetId" + "type": "string" } }, { - "name": "runner_name", - "in": "path", + "name": "namespace", + "in": "query", "required": true, "schema": { "type": "string" @@ -717,7 +692,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/NamespacesRunnerConfigsDeleteResponse" + "$ref": "#/components/schemas/RunnerConfigsDeleteResponse" } } } @@ -1301,108 +1276,6 @@ }, "additionalProperties": false }, - "NamespacesRunnerConfig": { - "oneOf": [ - { - "type": "object", - "required": [ - "serverless" - ], - "properties": { - "serverless": { - "type": "object", - "required": [ - "url", - "request_lifespan", - "slots_per_runner", - "min_runners", - "max_runners", - "runners_margin" - ], - "properties": { - "max_runners": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "min_runners": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "request_lifespan": { - "type": "integer", - "format": "int32", - "description": "Seconds.", - "minimum": 0 - }, - "runners_margin": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "slots_per_runner": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "url": { - "type": "string" - } - } - } - } - } - ] - }, - "NamespacesRunnerConfigVariant": { - "type": "string", - "enum": [ - "serverless" - ] - }, - "NamespacesRunnerConfigsDeleteResponse": { - "type": "object" - }, - "NamespacesRunnerConfigsGetResponse": { - "type": "object", - "required": [ - "runner_config" - ], - "properties": { - "runner_config": { - "$ref": "#/components/schemas/NamespacesRunnerConfig" - } - } - }, - "NamespacesRunnerConfigsListResponse": { - "type": "object", - "required": [ - "runner_configs", - "pagination" - ], - "properties": { - "pagination": { - "$ref": "#/components/schemas/Pagination" - }, - "runner_configs": { - "type": "object", - "additionalProperties": { - "$ref": "#/components/schemas/NamespacesRunnerConfig" - }, - "propertyNames": { - "type": "string" - } - } - }, - "additionalProperties": false - }, - "NamespacesRunnerConfigsUpsertRequest": { - "$ref": "#/components/schemas/NamespacesRunnerConfig" - }, - "NamespacesRunnerConfigsUpsertResponse": { - "type": "object" - }, "Pagination": { "type": "object", "properties": { @@ -1511,6 +1384,97 @@ }, "additionalProperties": false }, + "RunnerConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "serverless" + ], + "properties": { + "serverless": { + "type": "object", + "required": [ + "url", + "request_lifespan", + "slots_per_runner", + "min_runners", + "max_runners", + "runners_margin" + ], + "properties": { + "max_runners": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "min_runners": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "request_lifespan": { + "type": "integer", + "format": "int32", + "description": "Seconds.", + "minimum": 0 + }, + "runners_margin": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "slots_per_runner": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "url": { + "type": "string" + } + } + } + } + } + ] + }, + "RunnerConfigVariant": { + "type": "string", + "enum": [ + "serverless" + ] + }, + "RunnerConfigsDeleteResponse": { + "type": "object" + }, + "RunnerConfigsListResponse": { + "type": "object", + "required": [ + "runner_configs", + "pagination" + ], + "properties": { + "pagination": { + "$ref": "#/components/schemas/Pagination" + }, + "runner_configs": { + "type": "object", + "additionalProperties": { + "$ref": "#/components/schemas/RunnerConfig" + }, + "propertyNames": { + "type": "string" + } + } + }, + "additionalProperties": false + }, + "RunnerConfigsUpsertRequest": { + "$ref": "#/components/schemas/RunnerConfig" + }, + "RunnerConfigsUpsertResponse": { + "type": "object" + }, "RunnersGetResponse": { "type": "object", "required": [ diff --git a/packages/common/api-client/Cargo.toml b/packages/common/api-client/Cargo.toml deleted file mode 100644 index 8075f82de2..0000000000 --- a/packages/common/api-client/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "rivet-api-client" -version.workspace = true -authors.workspace = true -edition.workspace = true -license.workspace = true - -[dependencies] -anyhow.workspace = true -axum.workspace = true -futures-util.workspace = true -reqwest.workspace = true -rivet-api-builder.workspace = true -rivet-api-util.workspace = true -rivet-config.workspace = true -rivet-error.workspace = true -rivet-pools.workspace = true -rivet-util.workspace = true -serde_html_form.workspace = true -serde.workspace = true -tokio.workspace = true -tracing.workspace = true diff --git a/packages/common/api-client/src/lib.rs b/packages/common/api-client/src/lib.rs deleted file mode 100644 index deea077a71..0000000000 --- a/packages/common/api-client/src/lib.rs +++ /dev/null @@ -1,162 +0,0 @@ -use anyhow::{Context, Result}; -use axum::response::Response; -use futures_util::StreamExt; -use rivet_api_builder::ApiCtx; -use serde::{Serialize, de::DeserializeOwned}; -use std::future::Future; - -mod errors; - -pub use axum::http::{HeaderMap, Method}; - -/// Generic function to make raw requests to remote datacenters by label (returns axum Response) -pub async fn request_remote_datacenter_raw( - ctx: &ApiCtx, - dc_label: u16, - endpoint: &str, - method: Method, - headers: HeaderMap, - query: Option<&impl Serialize>, - body: Option<&impl Serialize>, -) -> Result { - let dc = ctx - .config() - .dc_for_label(dc_label) - .ok_or_else(|| errors::Datacenter::NotFound.build())?; - - let client = rivet_pools::reqwest::client().await?; - let mut url = dc.api_peer_url.join(endpoint)?; - - // NOTE: We don't use reqwest's `.query` because it doesn't support list query parameters - if let Some(q) = query { - url.set_query(Some(&serde_html_form::to_string(q)?)); - } - - let mut request = client.request(method, url).headers(headers); - - if let Some(b) = body { - request = request.json(b); - } - - let res = request - .send() - .await - .context("failed sending request to remote dc")?; - rivet_api_util::reqwest_to_axum_response(res) - .await - .context("failed parsing response from remote dc") -} - -/// Generic function to make requests to a specific datacenter -pub async fn request_remote_datacenter( - config: &rivet_config::Config, - dc_label: u16, - endpoint: &str, - method: Method, - headers: HeaderMap, - query: Option<&impl Serialize>, - body: Option<&impl Serialize>, -) -> Result -where - T: DeserializeOwned, -{ - let dc = config - .dc_for_label(dc_label) - .ok_or_else(|| errors::Datacenter::NotFound.build())?; - - let client = rivet_pools::reqwest::client().await?; - let mut url = dc.api_peer_url.join(endpoint)?; - - // NOTE: We don't use reqwest's `.query` because it doesn't support list query parameters - if let Some(q) = query { - url.set_query(Some(&serde_html_form::to_string(q)?)); - } - - let mut request = client.request(method, url).headers(headers); - - if let Some(b) = body { - request = request.json(b); - } - - let res = request - .send() - .await - .context("failed sending request to remote dc")?; - rivet_api_util::parse_response::(res) - .await - .context("failed parsing response from remote dc") -} - -/// Generic function to fanout requests to all datacenters and aggregate results -/// Returns aggregated results and errors only if all requests fail -pub async fn fanout_to_datacenters( - ctx: ApiCtx, - headers: HeaderMap, - endpoint: &str, - query: Q, - local_handler: F, - aggregator: A, -) -> Result -where - I: DeserializeOwned + Send + 'static, - Q: Serialize + Clone + Send + 'static, - F: Fn(ApiCtx, Q) -> Fut + Clone + Send + 'static, - Fut: Future> + Send, - A: Fn(I, &mut R), - R: Default + Send + 'static, -{ - let dcs = &ctx.config().topology().datacenters; - - let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { - let ctx = ctx.clone(); - let headers = headers.clone(); - let query = query.clone(); - let endpoint = endpoint.to_string(); - let local_handler = local_handler.clone(); - - async move { - if dc.datacenter_label == ctx.config().dc_label() { - // Local datacenter - use direct API call - local_handler(ctx, query).await - } else { - // Remote datacenter - HTTP request - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &endpoint, - Method::GET, - headers, - Some(&query), - Option::<&()>::None, - ) - .await - } - } - })) - .buffer_unordered(16) - .collect::>() - .await; - - // Aggregate results - let result_count = results.len(); - let mut errors = Vec::new(); - let mut aggregated = R::default(); - for res in results { - match res { - Ok(data) => aggregator(data, &mut aggregated), - Err(err) => { - tracing::error!(?err, "failed to request edge dc"); - errors.push(err); - } - } - } - - // Error only if all requests failed - if result_count == errors.len() { - if let Some(res) = errors.into_iter().next() { - return Err(res).context("all datacenter requests failed"); - } - } - - Ok(aggregated) -} diff --git a/packages/common/api-util/Cargo.toml b/packages/common/api-util/Cargo.toml index 42278fc3a7..0fac50f4a8 100644 --- a/packages/common/api-util/Cargo.toml +++ b/packages/common/api-util/Cargo.toml @@ -8,7 +8,15 @@ license.workspace = true [dependencies] anyhow.workspace = true axum.workspace = true +futures-util.workspace = true reqwest.workspace = true rivet-api-builder.workspace = true -serde.workspace = true +rivet-config.workspace = true +rivet-error.workspace = true +rivet-pools.workspace = true +rivet-util.workspace = true +serde_html_form.workspace = true serde_json.workspace = true +serde.workspace = true +tokio.workspace = true +tracing.workspace = true diff --git a/packages/common/api-client/src/errors.rs b/packages/common/api-util/src/errors.rs similarity index 100% rename from packages/common/api-client/src/errors.rs rename to packages/common/api-util/src/errors.rs diff --git a/packages/common/api-util/src/lib.rs b/packages/common/api-util/src/lib.rs index df9efd38ba..e6876d3696 100644 --- a/packages/common/api-util/src/lib.rs +++ b/packages/common/api-util/src/lib.rs @@ -1,7 +1,165 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use axum::{body::Body, response::Response}; -use rivet_api_builder::{ErrorResponse, RawErrorResponse}; -use serde::de::DeserializeOwned; +use futures_util::StreamExt; +use rivet_api_builder::{ApiCtx, ErrorResponse, RawErrorResponse}; +use serde::{Serialize, de::DeserializeOwned}; +use std::future::Future; + +mod errors; + +pub use axum::http::{HeaderMap, Method}; + +/// Generic function to make raw requests to remote datacenters by label (returns axum Response) +pub async fn request_remote_datacenter_raw( + ctx: &ApiCtx, + dc_label: u16, + endpoint: &str, + method: Method, + headers: HeaderMap, + query: Option<&impl Serialize>, + body: Option<&impl Serialize>, +) -> Result { + let dc = ctx + .config() + .dc_for_label(dc_label) + .ok_or_else(|| errors::Datacenter::NotFound.build())?; + + let client = rivet_pools::reqwest::client().await?; + let mut url = dc.api_peer_url.join(endpoint)?; + + // NOTE: We don't use reqwest's `.query` because it doesn't support list query parameters + if let Some(q) = query { + url.set_query(Some(&serde_html_form::to_string(q)?)); + } + + let mut request = client.request(method, url).headers(headers); + + if let Some(b) = body { + request = request.json(b); + } + + let res = request + .send() + .await + .context("failed sending request to remote dc")?; + reqwest_to_axum_response(res) + .await + .context("failed parsing response from remote dc") +} + +/// Generic function to make requests to a specific datacenter +pub async fn request_remote_datacenter( + config: &rivet_config::Config, + dc_label: u16, + endpoint: &str, + method: Method, + headers: HeaderMap, + query: Option<&impl Serialize>, + body: Option<&impl Serialize>, +) -> Result +where + T: DeserializeOwned, +{ + let dc = config + .dc_for_label(dc_label) + .ok_or_else(|| errors::Datacenter::NotFound.build())?; + + let client = rivet_pools::reqwest::client().await?; + let mut url = dc.api_peer_url.join(endpoint)?; + + // NOTE: We don't use reqwest's `.query` because it doesn't support list query parameters + if let Some(q) = query { + url.set_query(Some(&serde_html_form::to_string(q)?)); + } + + let mut request = client.request(method, url).headers(headers); + + if let Some(b) = body { + request = request.json(b); + } + + let res = request + .send() + .await + .context("failed sending request to remote dc")?; + parse_response::(res) + .await + .context("failed parsing response from remote dc") +} + +/// Generic function to fanout requests to all datacenters and aggregate results +/// Returns aggregated results and errors only if all requests fail +pub async fn fanout_to_datacenters( + ctx: ApiCtx, + headers: HeaderMap, + endpoint: &str, + query: Q, + local_handler: F, + aggregator: A, +) -> Result +where + I: DeserializeOwned + Send + 'static, + Q: Serialize + Clone + Send + 'static, + F: Fn(ApiCtx, Q) -> Fut + Clone + Send + 'static, + Fut: Future> + Send, + A: Fn(I, &mut R), + R: Default + Send + 'static, +{ + let dcs = &ctx.config().topology().datacenters; + + let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { + let ctx = ctx.clone(); + let headers = headers.clone(); + let query = query.clone(); + let endpoint = endpoint.to_string(); + let local_handler = local_handler.clone(); + + async move { + if dc.datacenter_label == ctx.config().dc_label() { + // Local datacenter - use direct API call + local_handler(ctx, query).await + } else { + // Remote datacenter - HTTP request + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &endpoint, + Method::GET, + headers, + Some(&query), + Option::<&()>::None, + ) + .await + } + } + })) + .buffer_unordered(16) + .collect::>() + .await; + + // Aggregate results + let result_count = results.len(); + let mut errors = Vec::new(); + let mut aggregated = R::default(); + for res in results { + match res { + Ok(data) => aggregator(data, &mut aggregated), + Err(err) => { + tracing::error!(?err, "failed to request edge dc"); + errors.push(err); + } + } + } + + // Error only if all requests failed + if result_count == errors.len() { + if let Some(res) = errors.into_iter().next() { + return Err(res).context("all datacenter requests failed"); + } + } + + Ok(aggregated) +} pub async fn reqwest_to_axum_response(reqwest_response: reqwest::Response) -> Result { let status = reqwest_response.status(); diff --git a/packages/common/config/src/config/api_public.rs b/packages/common/config/src/config/api_public.rs index b039982956..53cb280a37 100644 --- a/packages/common/config/src/config/api_public.rs +++ b/packages/common/config/src/config/api_public.rs @@ -1,17 +1,10 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use std::net::IpAddr; /// Configuration for the public API service. #[derive(Debug, Serialize, Deserialize, Clone, Default, JsonSchema)] #[serde(deny_unknown_fields)] pub struct ApiPublic { - /// The ip on which the API service listens. - pub host: Option, - /// The host on which the API service is accessible to Guard. - pub lan_host: Option, - /// The port on which the API service listens. - pub port: Option, /// Flag to enable verbose error reporting. pub verbose_errors: Option, /// Flag to respect the X-Forwarded-For header for client IP addresses. @@ -22,20 +15,6 @@ pub struct ApiPublic { } impl ApiPublic { - pub fn lan_host(&self) -> &str { - self.lan_host - .as_deref() - .unwrap_or(crate::defaults::hosts::API_PUBLIC_LAN) - } - - pub fn host(&self) -> IpAddr { - self.host.unwrap_or(crate::defaults::hosts::API_PUBLIC) - } - - pub fn port(&self) -> u16 { - self.port.unwrap_or(crate::defaults::ports::API_PUBLIC) - } - pub fn verbose_errors(&self) -> bool { self.verbose_errors.unwrap_or(true) } diff --git a/packages/common/test-deps/src/datacenter.rs b/packages/common/test-deps/src/datacenter.rs index 7394f6d0da..aa1596453c 100644 --- a/packages/common/test-deps/src/datacenter.rs +++ b/packages/common/test-deps/src/datacenter.rs @@ -60,15 +60,11 @@ pub async fn setup_single_datacenter( dc = dc.datacenter_label, "containers started, waiting for services to be ready" ); - // Pick ports for other services - // TODO: Race condition with picking before binding - let api_public_port = portpicker::pick_unused_port().context("api_public_port")?; let pegboard_port = portpicker::pick_unused_port().context("pegboard_port")?; tracing::info!( dc = dc.datacenter_label, - api_public_port, api_peer_port, pegboard_port, guard_port, @@ -79,10 +75,7 @@ pub async fn setup_single_datacenter( let mut root = rivet_config::config::Root::default(); root.database = Some(db_config); root.pubsub = Some(pubsub_config); - root.api_public = Some(rivet_config::config::ApiPublic { - port: Some(api_public_port), - ..Default::default() - }); + root.api_public = Some(Default::default()); root.api_peer = Some(rivet_config::config::ApiPeer { port: Some(api_peer_port), ..Default::default() @@ -116,7 +109,6 @@ pub async fn setup_single_datacenter( pools, config, container_names, - api_public_port, api_peer_port, guard_port, pegboard_port, diff --git a/packages/common/test-deps/src/lib.rs b/packages/common/test-deps/src/lib.rs index 386ff8bc4b..adb671549b 100644 --- a/packages/common/test-deps/src/lib.rs +++ b/packages/common/test-deps/src/lib.rs @@ -12,7 +12,6 @@ pub struct TestDeps { pub pools: rivet_pools::Pools, pub config: rivet_config::Config, container_names: Vec, - api_public_port: u16, api_peer_port: u16, pegboard_port: u16, guard_port: u16, @@ -84,10 +83,6 @@ impl TestDeps { } } - pub fn api_public_port(&self) -> u16 { - self.api_public_port - } - pub fn api_peer_port(&self) -> u16 { self.api_peer_port } diff --git a/packages/core/api-peer/src/lib.rs b/packages/core/api-peer/src/lib.rs index c951c31208..26df5c474a 100644 --- a/packages/core/api-peer/src/lib.rs +++ b/packages/core/api-peer/src/lib.rs @@ -6,6 +6,7 @@ pub mod actors; pub mod internal; pub mod namespaces; pub mod router; +pub mod runner_configs; pub mod runners; pub use router::router as create_router; diff --git a/packages/core/api-peer/src/namespaces/mod.rs b/packages/core/api-peer/src/namespaces.rs similarity index 99% rename from packages/core/api-peer/src/namespaces/mod.rs rename to packages/core/api-peer/src/namespaces.rs index c0c8dcffd3..931db762fb 100644 --- a/packages/core/api-peer/src/namespaces/mod.rs +++ b/packages/core/api-peer/src/namespaces.rs @@ -6,8 +6,6 @@ use rivet_util::Id; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; -pub mod runner_configs; - #[derive(Debug, Serialize, Deserialize, IntoParams)] #[serde(deny_unknown_fields)] #[into_params(parameter_in = Query)] diff --git a/packages/core/api-peer/src/namespaces/runner_configs.rs b/packages/core/api-peer/src/namespaces/runner_configs.rs deleted file mode 100644 index 39dec5a601..0000000000 --- a/packages/core/api-peer/src/namespaces/runner_configs.rs +++ /dev/null @@ -1,192 +0,0 @@ -use std::collections::HashMap; - -use anyhow::Result; -use rivet_api_builder::ApiCtx; -use rivet_api_types::pagination::Pagination; -use rivet_util::Id; -use serde::{Deserialize, Serialize}; -use utoipa::{IntoParams, ToSchema}; - -#[derive(Debug, Serialize, Deserialize, IntoParams)] -#[serde(deny_unknown_fields)] -#[into_params(parameter_in = Query)] -pub struct GetQuery {} - -#[derive(Deserialize, Serialize, ToSchema)] -#[schema(as = NamespacesRunnerConfigsGetResponse)] -pub struct GetResponse { - pub runner_config: namespace::types::RunnerConfig, -} - -#[derive(Deserialize)] -#[serde(deny_unknown_fields)] -pub struct GetPath { - pub namespace_id: Id, - pub runner_name: String, -} - -pub async fn get(ctx: ApiCtx, path: GetPath, _query: GetQuery) -> Result { - let runner_config = ctx - .op(namespace::ops::runner_config::get_local::Input { - runners: vec![(path.namespace_id, path.runner_name)], - }) - .await? - .into_iter() - .next() - .ok_or_else(|| namespace::errors::RunnerConfig::NotFound.build())?; - - Ok(GetResponse { - runner_config: runner_config.config, - }) -} - -#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)] -#[serde(deny_unknown_fields)] -#[into_params(parameter_in = Query)] -pub struct ListQuery { - pub limit: Option, - pub cursor: Option, - pub variant: Option, -} - -#[derive(Deserialize)] -#[serde(deny_unknown_fields)] -pub struct ListPath { - pub namespace_id: Id, -} - -#[derive(Deserialize, Serialize, ToSchema)] -#[serde(deny_unknown_fields)] -#[schema(as = NamespacesRunnerConfigsListResponse)] -pub struct ListResponse { - pub runner_configs: HashMap, - pub pagination: Pagination, -} - -pub async fn list(ctx: ApiCtx, path: ListPath, query: ListQuery) -> Result { - ctx.op(namespace::ops::get_local::Input { - namespace_ids: vec![path.namespace_id], - }) - .await? - .first() - .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - - // Parse variant from cursor if needed - let (variant, after_name) = if let Some(cursor) = query.cursor { - if let Some((variant, after_name)) = cursor.split_once(":") { - if query.variant.is_some() { - (query.variant, Some(after_name.to_string())) - } else { - ( - namespace::keys::RunnerConfigVariant::parse(variant), - Some(after_name.to_string()), - ) - } - } else { - (query.variant, None) - } - } else { - (query.variant, None) - }; - - let runner_configs_res = ctx - .op(namespace::ops::runner_config::list::Input { - namespace_id: path.namespace_id, - variant, - after_name, - limit: query.limit.unwrap_or(100), - }) - .await?; - - let cursor = runner_configs_res - .last() - .map(|(name, config)| format!("{}:{}", config.variant(), name)); - - Ok(ListResponse { - // TODO: Implement ComposeSchema for FakeMap so we don't have to reallocate - runner_configs: runner_configs_res.into_iter().collect(), - pagination: Pagination { cursor }, - }) -} - -#[derive(Debug, Serialize, Deserialize, IntoParams)] -#[serde(deny_unknown_fields)] -#[into_params(parameter_in = Query)] -pub struct UpsertQuery {} - -#[derive(Deserialize)] -#[serde(deny_unknown_fields)] -pub struct UpsertPath { - pub namespace_id: Id, - pub runner_name: String, -} - -#[derive(Deserialize, Serialize, ToSchema)] -#[serde(deny_unknown_fields)] -#[schema(as = NamespacesRunnerConfigsUpsertRequest)] -pub struct UpsertRequest(namespace::types::RunnerConfig); - -#[derive(Deserialize, Serialize, ToSchema)] -#[schema(as = NamespacesRunnerConfigsUpsertResponse)] -pub struct UpsertResponse {} - -pub async fn upsert( - ctx: ApiCtx, - path: UpsertPath, - _query: UpsertQuery, - body: UpsertRequest, -) -> Result { - ctx.op(namespace::ops::get_local::Input { - namespace_ids: vec![path.namespace_id], - }) - .await? - .first() - .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - - ctx.op(namespace::ops::runner_config::upsert::Input { - namespace_id: path.namespace_id, - name: path.runner_name, - config: body.0, - }) - .await?; - - Ok(UpsertResponse {}) -} - -#[derive(Debug, Serialize, Deserialize, IntoParams)] -#[serde(deny_unknown_fields)] -#[into_params(parameter_in = Query)] -pub struct DeleteQuery {} - -#[derive(Deserialize)] -#[serde(deny_unknown_fields)] -pub struct DeletePath { - pub namespace_id: Id, - pub runner_name: String, -} - -#[derive(Deserialize, Serialize, ToSchema)] -#[serde(deny_unknown_fields)] -#[schema(as = NamespacesRunnerConfigsDeleteRequest)] -pub struct DeleteRequest(namespace::types::RunnerConfig); - -#[derive(Deserialize, Serialize, ToSchema)] -#[schema(as = NamespacesRunnerConfigsDeleteResponse)] -pub struct DeleteResponse {} - -pub async fn delete(ctx: ApiCtx, path: DeletePath, _query: DeleteQuery) -> Result { - ctx.op(namespace::ops::get_local::Input { - namespace_ids: vec![path.namespace_id], - }) - .await? - .first() - .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - - ctx.op(namespace::ops::runner_config::delete::Input { - namespace_id: path.namespace_id, - name: path.runner_name, - }) - .await?; - - Ok(DeleteResponse {}) -} diff --git a/packages/core/api-peer/src/router.rs b/packages/core/api-peer/src/router.rs index fc8dd3dfce..f74e5feec6 100644 --- a/packages/core/api-peer/src/router.rs +++ b/packages/core/api-peer/src/router.rs @@ -1,6 +1,6 @@ use rivet_api_builder::{create_router, prelude::*}; -use crate::{actors, internal, namespaces, runners}; +use crate::{actors, internal, namespaces, runner_configs, runners}; pub async fn router( name: &'static str, @@ -19,21 +19,11 @@ pub async fn router( get(namespaces::resolve_for_name), ) // MARK: Runner configs + .route("/runner-configs", get(runner_configs::list)) + .route("/runner-configs/{runner_name}", put(runner_configs::upsert)) .route( - "/namespaces/{namespace_id}/runner-configs", - get(namespaces::runner_configs::list), - ) - .route( - "/namespaces/{namespace_id}/runner-configs/{runner_name}", - put(namespaces::runner_configs::upsert), - ) - .route( - "/namespaces/{namespace_id}/runner-configs/{runner_name}", - get(namespaces::runner_configs::get), - ) - .route( - "/namespaces/{namespace_id}/runner-configs/{runner_name}", - delete(namespaces::runner_configs::delete), + "/runner-configs/{runner_name}", + delete(runner_configs::delete), ) // MARK: Actors .route("/actors", get(actors::list::list)) diff --git a/packages/core/api-peer/src/runner_configs.rs b/packages/core/api-peer/src/runner_configs.rs new file mode 100644 index 0000000000..d4e1dd5a0c --- /dev/null +++ b/packages/core/api-peer/src/runner_configs.rs @@ -0,0 +1,185 @@ +use std::collections::HashMap; + +use anyhow::Result; +use rivet_api_builder::ApiCtx; +use rivet_api_types::pagination::Pagination; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; +use utoipa::{IntoParams, ToSchema}; + +#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct ListQuery { + pub namespace: String, + pub limit: Option, + pub cursor: Option, + pub variant: Option, + #[serde(default)] + pub runner_name: Vec, +} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ListPath { + pub namespace_id: Id, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsListResponse)] +pub struct ListResponse { + pub runner_configs: HashMap, + pub pagination: Pagination, +} + +pub async fn list(ctx: ApiCtx, path: ListPath, query: ListQuery) -> Result { + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + if !query.runner_name.is_empty() { + let runner_configs = ctx + .op(namespace::ops::runner_config::get_local::Input { + runners: query + .runner_name + .iter() + .map(|name| (namespace.namespace_id, name.clone())) + .collect(), + }) + .await?; + + Ok(ListResponse { + // TODO: Implement ComposeSchema for FakeMap so we don't have to reallocate + runner_configs: runner_configs + .into_iter() + .map(|c| (c.name, c.config)) + .collect(), + pagination: Pagination { cursor: None }, + }) + } else { + // Parse variant from cursor if needed + let (variant, after_name) = if let Some(cursor) = query.cursor { + if let Some((variant, after_name)) = cursor.split_once(":") { + if query.variant.is_some() { + (query.variant, Some(after_name.to_string())) + } else { + ( + namespace::keys::RunnerConfigVariant::parse(variant), + Some(after_name.to_string()), + ) + } + } else { + (query.variant, None) + } + } else { + (query.variant, None) + }; + + let runner_configs = ctx + .op(namespace::ops::runner_config::list::Input { + namespace_id: namespace.namespace_id, + variant, + after_name, + limit: query.limit.unwrap_or(100), + }) + .await?; + + let cursor = runner_configs + .last() + .map(|(name, config)| format!("{}:{}", config.variant(), name)); + + Ok(ListResponse { + // TODO: Implement ComposeSchema for FakeMap so we don't have to reallocate + runner_configs: runner_configs.into_iter().collect(), + pagination: Pagination { cursor }, + }) + } +} + +#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct UpsertQuery { + pub namespace: String, +} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct UpsertPath { + pub runner_name: String, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsUpsertRequest)] +pub struct UpsertRequest(namespace::types::RunnerConfig); + +#[derive(Deserialize, Serialize, ToSchema)] +#[schema(as = RunnerConfigsUpsertResponse)] +pub struct UpsertResponse {} + +pub async fn upsert( + ctx: ApiCtx, + path: UpsertPath, + query: UpsertQuery, + body: UpsertRequest, +) -> Result { + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + ctx.op(namespace::ops::runner_config::upsert::Input { + namespace_id: namespace.namespace_id, + name: path.runner_name, + config: body.0, + }) + .await?; + + Ok(UpsertResponse {}) +} + +#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct DeleteQuery { + pub namespace: String, +} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct DeletePath { + pub runner_name: String, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsDeleteRequest)] +pub struct DeleteRequest(namespace::types::RunnerConfig); + +#[derive(Deserialize, Serialize, ToSchema)] +#[schema(as = RunnerConfigsDeleteResponse)] +pub struct DeleteResponse {} + +pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result { + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + ctx.op(namespace::ops::runner_config::delete::Input { + namespace_id: namespace.namespace_id, + name: path.runner_name, + }) + .await?; + + Ok(DeleteResponse {}) +} diff --git a/packages/core/api-public/Cargo.toml b/packages/core/api-public/Cargo.toml index 3e8f58216b..450e9d74c0 100644 --- a/packages/core/api-public/Cargo.toml +++ b/packages/core/api-public/Cargo.toml @@ -16,7 +16,6 @@ namespace.workspace = true pegboard.workspace = true reqwest.workspace = true rivet-api-builder.workspace = true -rivet-api-client.workspace = true rivet-api-peer.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true diff --git a/packages/core/api-public/src/actors/create.rs b/packages/core/api-public/src/actors/create.rs index 929b5526de..5ac97bc45d 100644 --- a/packages/core/api-public/src/actors/create.rs +++ b/packages/core/api-public/src/actors/create.rs @@ -5,8 +5,8 @@ use axum::{ response::{IntoResponse, Json, Response}, }; use rivet_api_builder::{ApiCtx, ApiError}; -use rivet_api_client::request_remote_datacenter; use rivet_api_types::actors::create::{CreateRequest, CreateResponse}; +use rivet_api_util::request_remote_datacenter; use rivet_types::actors::CrashPolicy; use serde::{Deserialize, Serialize}; use utoipa::IntoParams; diff --git a/packages/core/api-public/src/actors/delete.rs b/packages/core/api-public/src/actors/delete.rs index 453c649a0a..12417a9165 100644 --- a/packages/core/api-public/src/actors/delete.rs +++ b/packages/core/api-public/src/actors/delete.rs @@ -5,7 +5,7 @@ use axum::{ response::{IntoResponse, Json, Response}, }; use rivet_api_builder::{ApiCtx, ApiError}; -use rivet_api_client::request_remote_datacenter_raw; +use rivet_api_util::request_remote_datacenter_raw; use rivet_util::Id; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; diff --git a/packages/core/api-public/src/actors/list.rs b/packages/core/api-public/src/actors/list.rs index 7e3ed0f148..93d7aca504 100644 --- a/packages/core/api-public/src/actors/list.rs +++ b/packages/core/api-public/src/actors/list.rs @@ -5,8 +5,8 @@ use axum::{ response::{IntoResponse, Json, Response}, }; use rivet_api_builder::{ApiCtx, ApiError}; -use rivet_api_client::fanout_to_datacenters; use rivet_api_types::pagination::Pagination; +use rivet_api_util::fanout_to_datacenters; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; diff --git a/packages/core/api-public/src/actors/list_names.rs b/packages/core/api-public/src/actors/list_names.rs index aadbd9b6ec..4480ceafa1 100644 --- a/packages/core/api-public/src/actors/list_names.rs +++ b/packages/core/api-public/src/actors/list_names.rs @@ -5,8 +5,8 @@ use axum::{ response::{IntoResponse, Json, Response}, }; use rivet_api_builder::{ApiCtx, ApiError}; -use rivet_api_client::fanout_to_datacenters; use rivet_api_types::{actors::list_names::*, pagination::Pagination}; +use rivet_api_util::fanout_to_datacenters; use rivet_types::actors::ActorName; /// ## Datacenter Round Trips diff --git a/packages/core/api-public/src/actors/utils.rs b/packages/core/api-public/src/actors/utils.rs index 06d9053ac4..4576a12e74 100644 --- a/packages/core/api-public/src/actors/utils.rs +++ b/packages/core/api-public/src/actors/utils.rs @@ -1,7 +1,7 @@ use anyhow::Result; use axum::http::{HeaderMap, Method}; use rivet_api_builder::ApiCtx; -use rivet_api_client::request_remote_datacenter; +use rivet_api_util::request_remote_datacenter; use rivet_error::RivetError; use rivet_types::actors::Actor; use rivet_util::Id; diff --git a/packages/core/api-public/src/lib.rs b/packages/core/api-public/src/lib.rs index be85290e04..3d8a55bfb6 100644 --- a/packages/core/api-public/src/lib.rs +++ b/packages/core/api-public/src/lib.rs @@ -1,28 +1,10 @@ -use std::net::SocketAddr; - -use anyhow::*; - pub mod actors; pub mod datacenters; mod errors; pub mod namespaces; pub mod router; +pub mod runner_configs; pub mod runners; pub mod ui; pub use router::router as create_router; - -pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { - let host = config.api_public().host(); - let port = config.api_public().port(); - let addr = SocketAddr::from((host, port)); - - let router = router::router("api-public", config, pools).await?; - - let listener = tokio::net::TcpListener::bind(addr).await?; - tracing::info!(?host, ?port, "api-public server listening"); - - axum::serve(listener, router).await?; - - Ok(()) -} diff --git a/packages/core/api-public/src/namespaces/mod.rs b/packages/core/api-public/src/namespaces.rs similarity index 96% rename from packages/core/api-public/src/namespaces/mod.rs rename to packages/core/api-public/src/namespaces.rs index 7c2ef444bc..c3818993b6 100644 --- a/packages/core/api-public/src/namespaces/mod.rs +++ b/packages/core/api-public/src/namespaces.rs @@ -7,10 +7,8 @@ use axum::{ use rivet_api_builder::{ApiCtx, ApiError}; use rivet_util::Id; -use rivet_api_client::{request_remote_datacenter, request_remote_datacenter_raw}; use rivet_api_peer::namespaces::*; - -pub mod runner_configs; +use rivet_api_util::{request_remote_datacenter, request_remote_datacenter_raw}; #[utoipa::path( get, diff --git a/packages/core/api-public/src/router.rs b/packages/core/api-public/src/router.rs index d165f55f0f..2b7d5e2e52 100644 --- a/packages/core/api-public/src/router.rs +++ b/packages/core/api-public/src/router.rs @@ -5,7 +5,7 @@ use rivet_api_builder::{ }; use utoipa::OpenApi; -use crate::{actors, datacenters, namespaces, runners, ui}; +use crate::{actors, datacenters, namespaces, runner_configs, runners, ui}; #[derive(OpenApi)] #[openapi(paths( @@ -23,10 +23,9 @@ use crate::{actors, datacenters, namespaces, runners, ui}; namespaces::list, namespaces::get, namespaces::create, - namespaces::runner_configs::list, - namespaces::runner_configs::get, - namespaces::runner_configs::upsert, - namespaces::runner_configs::delete, + runner_configs::list, + runner_configs::upsert, + runner_configs::delete, datacenters::list, ))] #[openapi(components(schemas(namespace::keys::RunnerConfigVariant)))] @@ -51,21 +50,14 @@ pub async fn router( "/namespaces/{namespace_id}", axum::routing::get(namespaces::get), ) + .route("/runner-configs", axum::routing::get(runner_configs::list)) .route( - "/namespaces/{namespace_id}/runner-configs", - axum::routing::get(namespaces::runner_configs::list), + "/runner-configs/{runner_name}", + axum::routing::put(runner_configs::upsert), ) .route( - "/namespaces/{namespace_id}/runner-configs/{runner_name}", - axum::routing::get(namespaces::runner_configs::get), - ) - .route( - "/namespaces/{namespace_id}/runner-configs/{runner_name}", - axum::routing::put(namespaces::runner_configs::upsert), - ) - .route( - "/namespaces/{namespace_id}/runner-configs/{runner_name}", - axum::routing::delete(namespaces::runner_configs::delete), + "/runner-configs/{runner_name}", + axum::routing::delete(runner_configs::delete), ) // MARK: Actors .route("/actors", axum::routing::get(actors::list::list)) diff --git a/packages/core/api-public/src/namespaces/runner_configs.rs b/packages/core/api-public/src/runner_configs.rs similarity index 56% rename from packages/core/api-public/src/namespaces/runner_configs.rs rename to packages/core/api-public/src/runner_configs.rs index 250bf7ee3e..1f51b6d9d4 100644 --- a/packages/core/api-public/src/namespaces/runner_configs.rs +++ b/packages/core/api-public/src/runner_configs.rs @@ -7,66 +7,14 @@ use axum::{ use rivet_api_builder::{ApiCtx, ApiError}; use rivet_util::Id; -use rivet_api_client::request_remote_datacenter; -use rivet_api_peer::namespaces::runner_configs::*; +use rivet_api_peer::runner_configs::*; +use rivet_api_util::request_remote_datacenter; #[utoipa::path( get, - operation_id = "namespaces_runner_configs_get", - path = "/namespaces/{namespace_id}/runner-configs/{runner_name}", + operation_id = "runner_configs_list", + path = "/runner-configs", params( - ("namespace_id" = Id, Path), - ("runner_name" = String, Path), - GetQuery, - ), - responses( - (status = 200, body = GetResponse), - ), -)] -pub async fn get( - Extension(ctx): Extension, - headers: HeaderMap, - Path(path): Path, - Query(query): Query, -) -> Response { - match get_inner(ctx, headers, path, query).await { - Ok(response) => Json(response).into_response(), - Err(err) => ApiError::from(err).into_response(), - } -} - -async fn get_inner( - ctx: ApiCtx, - headers: HeaderMap, - path: GetPath, - query: GetQuery, -) -> Result { - if ctx.config().is_leader() { - rivet_api_peer::namespaces::runner_configs::get(ctx, path, query).await - } else { - let leader_dc = ctx.config().leader_dc()?; - request_remote_datacenter::( - ctx.config(), - leader_dc.datacenter_label, - &format!( - "/namespaces/{}/runner-configs/{}", - path.namespace_id, path.runner_name - ), - axum::http::Method::GET, - headers, - Some(&query), - Option::<&()>::None, - ) - .await - } -} - -#[utoipa::path( - get, - operation_id = "namespaces_runner_configs_list", - path = "/namespaces/{namespace_id}/runner-configs", - params( - ("namespace_id" = Id, Path), ListQuery, ), responses( @@ -92,13 +40,13 @@ async fn list_inner( query: ListQuery, ) -> Result { if ctx.config().is_leader() { - rivet_api_peer::namespaces::runner_configs::list(ctx, path, query).await + rivet_api_peer::runner_configs::list(ctx, path, query).await } else { let leader_dc = ctx.config().leader_dc()?; request_remote_datacenter::( ctx.config(), leader_dc.datacenter_label, - &format!("/namespaces/{}/runner-configs", path.namespace_id), + "/runner-configs", axum::http::Method::GET, headers, Some(&query), @@ -110,10 +58,9 @@ async fn list_inner( #[utoipa::path( put, - operation_id = "namespaces_runner_configs_upsert", - path = "/namespaces/{namespace_id}/runner-configs/{runner_name}", + operation_id = "runner_configs_upsert", + path = "/runner-configs/{runner_name}", params( - ("namespace_id" = Id, Path), ("runner_name" = String, Path), UpsertQuery, ), @@ -143,19 +90,16 @@ async fn upsert_inner( body: UpsertRequest, ) -> Result { if ctx.config().is_leader() { - rivet_api_peer::namespaces::runner_configs::upsert(ctx, path, query, body).await + rivet_api_peer::runner_configs::upsert(ctx, path, query, body).await } else { let leader_dc = ctx.config().leader_dc()?; request_remote_datacenter::( ctx.config(), leader_dc.datacenter_label, - &format!( - "/namespaces/{}/runner-configs/{}", - path.namespace_id, path.runner_name - ), + &format!("/runner-configs/{}", path.runner_name), axum::http::Method::PUT, headers, - Option::<&()>::None, + Some(&query), Some(&body), ) .await @@ -164,10 +108,9 @@ async fn upsert_inner( #[utoipa::path( delete, - operation_id = "namespaces_runner_configs_delete", - path = "/namespaces/{namespace_id}/runner-configs/{runner_name}", + operation_id = "runner_configs_delete", + path = "/runner-configs/{runner_name}", params( - ("namespace_id" = Id, Path), ("runner_name" = String, Path), DeleteQuery, ), @@ -194,16 +137,13 @@ async fn delete_inner( query: DeleteQuery, ) -> Result { if ctx.config().is_leader() { - rivet_api_peer::namespaces::runner_configs::delete(ctx, path, query).await + rivet_api_peer::runner_configs::delete(ctx, path, query).await } else { let leader_dc = ctx.config().leader_dc()?; request_remote_datacenter::( ctx.config(), leader_dc.datacenter_label, - &format!( - "/namespaces/{}/runner-configs/{}", - path.namespace_id, path.runner_name - ), + &format!("/runner-configs/{}", path.runner_name), axum::http::Method::DELETE, headers, Some(&query), diff --git a/packages/core/api-public/src/runners.rs b/packages/core/api-public/src/runners.rs index 169cdc527a..3b5a9f85ca 100644 --- a/packages/core/api-public/src/runners.rs +++ b/packages/core/api-public/src/runners.rs @@ -5,11 +5,11 @@ use axum::{ response::{IntoResponse, Json, Response}, }; use rivet_api_builder::{ApiCtx, ApiError}; -use rivet_api_client::{fanout_to_datacenters, request_remote_datacenter_raw}; use rivet_api_types::{ pagination::Pagination, runners::{get::*, list::*}, }; +use rivet_api_util::{fanout_to_datacenters, request_remote_datacenter_raw}; use rivet_util::Id; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; diff --git a/packages/core/guard/server/Cargo.toml b/packages/core/guard/server/Cargo.toml index 0b9944e328..730de20dc5 100644 --- a/packages/core/guard/server/Cargo.toml +++ b/packages/core/guard/server/Cargo.toml @@ -27,7 +27,6 @@ pegboard-gateway.workspace = true pegboard-tunnel.workspace = true pegboard.workspace = true regex.workspace = true -rivet-api-peer.workspace = true rivet-api-public.workspace = true rivet-cache.workspace = true rivet-config.workspace = true diff --git a/packages/core/guard/server/src/routing/api_peer.rs b/packages/core/guard/server/src/routing/api_peer.rs deleted file mode 100644 index df6c9d2398..0000000000 --- a/packages/core/guard/server/src/routing/api_peer.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::sync::Arc; - -use anyhow::*; -use async_trait::async_trait; -use bytes::Bytes; -use gas::prelude::*; -use http_body_util::{BodyExt, Full}; -use hyper::body::Incoming as BodyIncoming; -use hyper::{Request, Response}; -use hyper_tungstenite::HyperWebsocket; -use rivet_guard_core::proxy_service::{ResponseBody, RoutingOutput}; -use rivet_guard_core::{CustomServeTrait, request_context::RequestContext}; -use tower::Service; - -struct ApiPeerService { - router: axum::Router, -} - -#[async_trait] -impl CustomServeTrait for ApiPeerService { - async fn handle_request( - &self, - req: Request>, - _request_context: &mut RequestContext, - ) -> Result> { - // Clone the router to get a mutable service - let mut service = self.router.clone(); - - // Call the service - let response = service - .call(req) - .await - .map_err(|e| anyhow::anyhow!("Failed to call api-peer service: {}", e))?; - - // Collect the body and convert to ResponseBody - let (parts, body) = response.into_parts(); - let collected = body - .collect() - .await - .map_err(|e| anyhow::anyhow!("Failed to collect response body: {}", e))?; - let bytes = collected.to_bytes(); - let response_body = ResponseBody::Full(Full::new(bytes)); - let response = Response::from_parts(parts, response_body); - - Ok(response) - } - - async fn handle_websocket( - &self, - client_ws: HyperWebsocket, - _headers: &hyper::HeaderMap, - _path: &str, - _request_context: &mut RequestContext, - ) -> std::result::Result<(), (HyperWebsocket, anyhow::Error)> { - Err(( - client_ws, - anyhow::anyhow!("api-peer does not support WebSocket connections"), - )) - } -} - -/// Route requests to the api-peer service -#[tracing::instrument(skip_all)] -pub async fn route_request( - ctx: &StandaloneCtx, - target: &str, - _host: &str, - _path: &str, -) -> Result> { - // Check target - if target != "api-peer" { - return Ok(None); - } - - // Create the router once - let router = - rivet_api_peer::create_router("api-peer", ctx.config().clone(), ctx.pools().clone()) - .await?; - - let service = Arc::new(ApiPeerService { router }); - - return Ok(Some(RoutingOutput::CustomServe(service))); -} diff --git a/packages/core/guard/server/src/routing/mod.rs b/packages/core/guard/server/src/routing/mod.rs index 524d63075d..d0a2b4c00f 100644 --- a/packages/core/guard/server/src/routing/mod.rs +++ b/packages/core/guard/server/src/routing/mod.rs @@ -7,7 +7,6 @@ use rivet_guard_core::RoutingFn; use crate::{errors, shared_state::SharedState}; -mod api_peer; mod api_public; pub mod pegboard_gateway; mod pegboard_tunnel; @@ -66,12 +65,6 @@ pub fn create_routing_function(ctx: StandaloneCtx, shared_state: SharedState) -> { return Ok(routing_output); } - - if let Some(routing_output) = - api_peer::route_request(&ctx, target, host, path).await? - { - return Ok(routing_output); - } } else { // No x-rivet-target header, try routing to api-public by default if let Some(routing_output) = diff --git a/packages/infra/engine/Cargo.toml b/packages/infra/engine/Cargo.toml index b654a96003..354f052b90 100644 --- a/packages/infra/engine/Cargo.toml +++ b/packages/infra/engine/Cargo.toml @@ -23,7 +23,6 @@ pegboard-serverless.workspace = true pegboard-runner-ws.workspace = true reqwest.workspace = true rivet-api-peer.workspace = true -rivet-api-public.workspace = true rivet-bootstrap.workspace = true rivet-cache.workspace = true rivet-config.workspace = true diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index 7cfb989853..e46cabd423 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -3,9 +3,6 @@ use rivet_service_manager::{RunConfigData, Service, ServiceKind}; pub fn config(_rivet_config: rivet_config::Config) -> Result { let services = vec![ - Service::new("api_public", ServiceKind::ApiPublic, |config, pools| { - Box::pin(rivet_api_public::start(config, pools)) - }), Service::new("api_peer", ServiceKind::ApiPeer, |config, pools| { Box::pin(rivet_api_peer::start(config, pools)) }), diff --git a/packages/infra/engine/tests/common/ctx.rs b/packages/infra/engine/tests/common/ctx.rs index 13d96e951a..b29378bddb 100644 --- a/packages/infra/engine/tests/common/ctx.rs +++ b/packages/infra/engine/tests/common/ctx.rs @@ -78,9 +78,6 @@ impl TestCtx { let pools = pools.clone(); async move { let services = vec![ - Service::new("api-public", ServiceKind::ApiPublic, |config, pools| { - Box::pin(rivet_api_public::start(config, pools)) - }), Service::new("api-peer", ServiceKind::ApiPeer, |config, pools| { Box::pin(rivet_api_peer::start(config, pools)) }), @@ -109,9 +106,8 @@ impl TestCtx { // Wait for ports to open tracing::info!(dc_label, "waiting for services to be ready"); tokio::join!( - wait_for_port("api-public", test_deps.api_public_port()), - wait_for_port("guard", test_deps.guard_port()), wait_for_port("api-peer", test_deps.api_peer_port()), + wait_for_port("guard", test_deps.guard_port()), wait_for_port("pegboard", test_deps.pegboard_port()), ); diff --git a/packages/services/internal/Cargo.toml b/packages/services/internal/Cargo.toml index fd65a9323d..8c85c065b7 100644 --- a/packages/services/internal/Cargo.toml +++ b/packages/services/internal/Cargo.toml @@ -8,6 +8,6 @@ edition.workspace = true [dependencies] anyhow.workspace = true gas.workspace = true -rivet-api-client.workspace = true +rivet-api-util.workspace = true rivet-types.workspace = true serde.workspace = true diff --git a/packages/services/internal/src/ops/bump_serverless_autoscaler_global.rs b/packages/services/internal/src/ops/bump_serverless_autoscaler_global.rs index 2c76300b27..bc2e698b22 100644 --- a/packages/services/internal/src/ops/bump_serverless_autoscaler_global.rs +++ b/packages/services/internal/src/ops/bump_serverless_autoscaler_global.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use futures_util::StreamExt; use gas::prelude::*; -use rivet_api_client::{HeaderMap, Method, request_remote_datacenter}; +use rivet_api_util::{HeaderMap, Method, request_remote_datacenter}; #[derive(Clone, Debug, Default)] pub struct Input {} diff --git a/packages/services/internal/src/ops/cache/purge_global.rs b/packages/services/internal/src/ops/cache/purge_global.rs index b86edac77a..ecd4c74684 100644 --- a/packages/services/internal/src/ops/cache/purge_global.rs +++ b/packages/services/internal/src/ops/cache/purge_global.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use futures_util::StreamExt; use gas::prelude::*; -use rivet_api_client::{HeaderMap, Method, request_remote_datacenter}; +use rivet_api_util::{HeaderMap, Method, request_remote_datacenter}; use rivet_cache::RawCacheKey; use serde::Serialize; diff --git a/packages/services/namespace/src/keys.rs b/packages/services/namespace/src/keys.rs index 800b943793..caa49d1005 100644 --- a/packages/services/namespace/src/keys.rs +++ b/packages/services/namespace/src/keys.rs @@ -194,7 +194,6 @@ impl<'de> TupleUnpack<'de> for ByNameKey { #[derive(Clone, Copy, Debug, Serialize, Deserialize, strum::FromRepr, ToSchema)] #[serde(rename_all = "snake_case")] -#[schema(as = NamespacesRunnerConfigVariant)] pub enum RunnerConfigVariant { Serverless = 0, } diff --git a/packages/services/namespace/src/types.rs b/packages/services/namespace/src/types.rs index 2dc477ce11..b2034accd4 100644 --- a/packages/services/namespace/src/types.rs +++ b/packages/services/namespace/src/types.rs @@ -13,7 +13,6 @@ pub struct Namespace { #[derive(Debug, Clone, Serialize, Deserialize, Hash, ToSchema)] #[serde(rename_all = "snake_case")] -#[schema(as = NamespacesRunnerConfig)] pub enum RunnerConfig { Serverless { url: String, diff --git a/packages/services/pegboard/Cargo.toml b/packages/services/pegboard/Cargo.toml index cc08e962cf..c75e5ff91b 100644 --- a/packages/services/pegboard/Cargo.toml +++ b/packages/services/pegboard/Cargo.toml @@ -12,7 +12,6 @@ gas.workspace = true lazy_static.workspace = true namespace.workspace = true nix.workspace = true -rivet-api-client.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true rivet-data.workspace = true diff --git a/packages/services/pegboard/src/ops/actor/create.rs b/packages/services/pegboard/src/ops/actor/create.rs index 962cf1299a..b201e3111e 100644 --- a/packages/services/pegboard/src/ops/actor/create.rs +++ b/packages/services/pegboard/src/ops/actor/create.rs @@ -1,6 +1,6 @@ use anyhow::Result; use gas::prelude::*; -use rivet_api_client::{Method, request_remote_datacenter}; +use rivet_api_util::{Method, request_remote_datacenter}; use rivet_types::actors::{Actor, CrashPolicy}; #[derive(Debug)] diff --git a/packages/services/pegboard/src/ops/actor/get_for_key.rs b/packages/services/pegboard/src/ops/actor/get_for_key.rs index f850f88c69..7d8b3f8499 100644 --- a/packages/services/pegboard/src/ops/actor/get_for_key.rs +++ b/packages/services/pegboard/src/ops/actor/get_for_key.rs @@ -1,6 +1,6 @@ use anyhow::Result; use gas::prelude::*; -use rivet_api_client::{Method, request_remote_datacenter}; +use rivet_api_util::{Method, request_remote_datacenter}; use rivet_types::actors::Actor; #[derive(Debug)]