diff --git a/out/openapi.json b/out/openapi.json index 3eeca9e31d..fd8259a654 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -1011,7 +1011,8 @@ "actor": { "$ref": "#/components/schemas/Actor" } - } + }, + "additionalProperties": false }, "ActorsDeleteResponse": { "type": "object" diff --git a/packages/common/api-types/src/actors/create.rs b/packages/common/api-types/src/actors/create.rs index 082ed027eb..004058dc7f 100644 --- a/packages/common/api-types/src/actors/create.rs +++ b/packages/common/api-types/src/actors/create.rs @@ -13,7 +13,6 @@ pub struct CreateQuery { #[serde(deny_unknown_fields)] #[schema(as = ActorsCreateRequest)] pub struct CreateRequest { - pub actor_id: Id, pub name: String, pub key: Option, pub input: Option, diff --git a/packages/core/api-peer/src/actors/create.rs b/packages/core/api-peer/src/actors/create.rs index 1980262919..ebf91f98eb 100644 --- a/packages/core/api-peer/src/actors/create.rs +++ b/packages/core/api-peer/src/actors/create.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use gas::prelude::*; use rivet_api_builder::ApiCtx; use rivet_api_types::actors::create::{CreateQuery, CreateRequest, CreateResponse}; @@ -15,27 +16,24 @@ pub async fn create( .await? .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + let actor_id = Id::new_v1(ctx.config().dc_label()); + let res = ctx .op(pegboard::ops::actor::create::Input { - actor_id: body.actor_id, + actor_id, namespace_id: namespace.namespace_id, name: body.name.clone(), - key: body.key.clone(), - runner_name_selector: body.runner_name_selector.clone(), + key: body.key, + runner_name_selector: body.runner_name_selector, input: body.input.clone(), crash_policy: body.crash_policy, - // Don't forward request since this request should be already forwarded if it is going - // to be forward. - // - // This should never throw a request needs to be forwarded error. If it does, something - // is broken. - forward_request: false, + // NOTE: This can forward if the user attempts to create an actor with a target dc and this dc + // ends up forwarding to another. + forward_request: true, // api-peer is always creating in its own datacenter datacenter_name: None, }) .await?; - let actor = res.actor; - - Ok(CreateResponse { actor }) + Ok(CreateResponse { actor: res.actor }) } diff --git a/packages/core/api-public/src/actors/create.rs b/packages/core/api-public/src/actors/create.rs index 2d037a05ee..f661b5b12c 100644 --- a/packages/core/api-public/src/actors/create.rs +++ b/packages/core/api-public/src/actors/create.rs @@ -1,11 +1,17 @@ use anyhow::Result; -use rivet_api_builder::ApiCtx; +use axum::{ + extract::{Extension, Query}, + http::HeaderMap, + response::{IntoResponse, Json, Response}, +}; +use rivet_api_builder::{ApiCtx, ApiError}; +use rivet_api_client::request_remote_datacenter; +use rivet_api_types::actors::create::{CreateRequest, CreateResponse}; use rivet_types::actors::CrashPolicy; -use rivet_util::Id; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; -#[derive(Debug, Deserialize, IntoParams)] +#[derive(Debug, Serialize, Deserialize, IntoParams)] #[serde(deny_unknown_fields)] #[into_params(parameter_in = Query)] pub struct CreateQuery { @@ -13,23 +19,6 @@ pub struct CreateQuery { pub datacenter: Option, } -#[derive(Deserialize, ToSchema)] -#[serde(deny_unknown_fields)] -#[schema(as = ActorsCreateRequest)] -pub struct CreateRequest { - pub name: String, - pub key: Option, - pub input: Option, - pub runner_name_selector: String, - pub crash_policy: CrashPolicy, -} - -#[derive(Serialize, ToSchema)] -#[schema(as = ActorsCreateResponse)] -pub struct CreateResponse { - pub actor: rivet_types::actors::Actor, -} - /// ## Datacenter Round Trips /// /// **If actor is created in the current datacenter:** @@ -57,18 +46,23 @@ pub struct CreateResponse { ), )] pub async fn create( + Extension(ctx): Extension, + headers: HeaderMap, + Query(query): Query, + Json(body): Json, +) -> Response { + match create_inner(ctx, headers, query, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn create_inner( ctx: ApiCtx, - _path: (), + headers: HeaderMap, query: CreateQuery, body: CreateRequest, ) -> Result { - let namespace = ctx - .op(namespace::ops::resolve_for_name_global::Input { - name: query.namespace.clone(), - }) - .await? - .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - // Determine which datacenter to create the actor in let target_dc_label = if let Some(dc_name) = &query.datacenter { ctx.config() @@ -79,26 +73,22 @@ pub async fn create( ctx.config().dc_label() }; - let actor_id = Id::new_v1(target_dc_label); - - let key: Option = body.key; - - let res = ctx - .op(pegboard::ops::actor::create::Input { - actor_id, - namespace_id: namespace.namespace_id, - name: body.name.clone(), - key, - runner_name_selector: body.runner_name_selector, - input: body.input.clone(), - crash_policy: body.crash_policy, - // Forward requests to the correct api-peer datacenter - forward_request: true, - datacenter_name: query.datacenter.clone(), - }) - .await?; - - let actor = res.actor; + let query = rivet_api_types::actors::create::CreateQuery { + namespace: query.namespace, + }; - Ok(CreateResponse { actor }) + if target_dc_label == ctx.config().dc_label() { + rivet_api_peer::actors::create::create(ctx, (), query, body).await + } else { + request_remote_datacenter::( + ctx.config(), + target_dc_label, + "/actors", + axum::http::Method::POST, + headers, + Some(&query), + Some(&body), + ) + .await + } } diff --git a/packages/core/api-public/src/router.rs b/packages/core/api-public/src/router.rs index 9c06fd15b8..a33b8b3243 100644 --- a/packages/core/api-public/src/router.rs +++ b/packages/core/api-public/src/router.rs @@ -68,7 +68,7 @@ pub async fn router( ) // MARK: Actors .route("/actors", axum::routing::get(actors::list::list)) - .route("/actors", post(actors::create::create)) + .route("/actors", axum::routing::post(actors::create::create)) .route( "/actors", axum::routing::put(actors::get_or_create::get_or_create), diff --git a/packages/core/pegboard-gateway/src/lib.rs b/packages/core/pegboard-gateway/src/lib.rs index b1b89dd4ed..8e45374927 100644 --- a/packages/core/pegboard-gateway/src/lib.rs +++ b/packages/core/pegboard-gateway/src/lib.rs @@ -26,7 +26,6 @@ use rivet_tunnel_protocol::{ ToServerWebSocketOpen, }; use rivet_util::serde::HashableMap; -use std::time::Duration; use tokio_tungstenite::tungstenite::Message; use crate::shared_state::{SharedState, TunnelMessageData}; diff --git a/packages/services/pegboard/src/ops/actor/create.rs b/packages/services/pegboard/src/ops/actor/create.rs index 66a7a1b42c..962cf1299a 100644 --- a/packages/services/pegboard/src/ops/actor/create.rs +++ b/packages/services/pegboard/src/ops/actor/create.rs @@ -133,9 +133,6 @@ async fn forward_to_datacenter( .next() .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - // Generate a new actor ID with the correct datacenter label - let actor_id = Id::new_v1(datacenter_label); - // Make request to remote datacenter let response = request_remote_datacenter::( ctx.config(), @@ -147,7 +144,6 @@ async fn forward_to_datacenter( namespace: namespace.name.clone(), }), Some(&rivet_api_types::actors::create::CreateRequest { - actor_id, name, key, input, diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index d01755d520..ffd1c25fd4 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -106,10 +106,10 @@ async fn allocate_actor( .udb()? .run(|tx| async move { let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS; - let tx = tx.with_subspace(keys::subspace()); // Check if runner is an serverless runner let for_serverless = tx + .with_subspace(namespace::keys::subspace()) .exists( &namespace::keys::RunnerConfigByVariantKey::new( namespace_id, @@ -120,6 +120,8 @@ async fn allocate_actor( ) .await?; + let tx = tx.with_subspace(keys::subspace()); + if for_serverless { tx.atomic_op( &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(