Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion out/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion packages/common/api-types/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub struct CreateQuery {
#[serde(deny_unknown_fields)]
#[schema(as = ActorsCreateRequest)]
pub struct CreateRequest {
pub actor_id: Id,
pub name: String,
pub key: Option<String>,
pub input: Option<String>,
Expand Down
22 changes: 10 additions & 12 deletions packages/core/api-peer/src/actors/create.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::create::{CreateQuery, CreateRequest, CreateResponse};

Expand All @@ -15,27 +16,24 @@ pub async fn create(
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

let actor_id = Id::new_v1(ctx.config().dc_label());

let res = ctx
.op(pegboard::ops::actor::create::Input {
actor_id: body.actor_id,
actor_id,
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key: body.key.clone(),
runner_name_selector: body.runner_name_selector.clone(),
key: body.key,
runner_name_selector: body.runner_name_selector,
input: body.input.clone(),
crash_policy: body.crash_policy,
// Don't forward request since this request should be already forwarded if it is going
// to be forward.
//
// This should never throw a request needs to be forwarded error. If it does, something
// is broken.
forward_request: false,
// NOTE: This can forward if the user attempts to create an actor with a target dc and this dc
// ends up forwarding to another.
forward_request: true,
// api-peer is always creating in its own datacenter
datacenter_name: None,
})
.await?;

let actor = res.actor;

Ok(CreateResponse { actor })
Ok(CreateResponse { actor: res.actor })
}
88 changes: 39 additions & 49 deletions packages/core/api-public/src/actors/create.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,24 @@
use anyhow::Result;
use rivet_api_builder::ApiCtx;
use axum::{
extract::{Extension, Query},
http::HeaderMap,
response::{IntoResponse, Json, Response},
};
use rivet_api_builder::{ApiCtx, ApiError};
use rivet_api_client::request_remote_datacenter;
use rivet_api_types::actors::create::{CreateRequest, CreateResponse};
use rivet_types::actors::CrashPolicy;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};

#[derive(Debug, Deserialize, IntoParams)]
#[derive(Debug, Serialize, Deserialize, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct CreateQuery {
pub namespace: String,
pub datacenter: Option<String>,
}

#[derive(Deserialize, ToSchema)]
#[serde(deny_unknown_fields)]
#[schema(as = ActorsCreateRequest)]
pub struct CreateRequest {
pub name: String,
pub key: Option<String>,
pub input: Option<String>,
pub runner_name_selector: String,
pub crash_policy: CrashPolicy,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsCreateResponse)]
pub struct CreateResponse {
pub actor: rivet_types::actors::Actor,
}

/// ## Datacenter Round Trips
///
/// **If actor is created in the current datacenter:**
Expand Down Expand Up @@ -57,18 +46,23 @@ pub struct CreateResponse {
),
)]
pub async fn create(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Query(query): Query<CreateQuery>,
Json(body): Json<CreateRequest>,
) -> Response {
match create_inner(ctx, headers, query, body).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

async fn create_inner(
ctx: ApiCtx,
_path: (),
headers: HeaderMap,
query: CreateQuery,
body: CreateRequest,
) -> Result<CreateResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Determine which datacenter to create the actor in
let target_dc_label = if let Some(dc_name) = &query.datacenter {
ctx.config()
Expand All @@ -79,26 +73,22 @@ pub async fn create(
ctx.config().dc_label()
};

let actor_id = Id::new_v1(target_dc_label);

let key: Option<String> = body.key;

let res = ctx
.op(pegboard::ops::actor::create::Input {
actor_id,
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key,
runner_name_selector: body.runner_name_selector,
input: body.input.clone(),
crash_policy: body.crash_policy,
// Forward requests to the correct api-peer datacenter
forward_request: true,
datacenter_name: query.datacenter.clone(),
})
.await?;

let actor = res.actor;
let query = rivet_api_types::actors::create::CreateQuery {
namespace: query.namespace,
};

Ok(CreateResponse { actor })
if target_dc_label == ctx.config().dc_label() {
rivet_api_peer::actors::create::create(ctx, (), query, body).await
} else {
request_remote_datacenter::<CreateResponse>(
ctx.config(),
target_dc_label,
"/actors",
axum::http::Method::POST,
headers,
Some(&query),
Some(&body),
)
.await
}
}
2 changes: 1 addition & 1 deletion packages/core/api-public/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn router(
)
// MARK: Actors
.route("/actors", axum::routing::get(actors::list::list))
.route("/actors", post(actors::create::create))
.route("/actors", axum::routing::post(actors::create::create))
.route(
"/actors",
axum::routing::put(actors::get_or_create::get_or_create),
Expand Down
1 change: 0 additions & 1 deletion packages/core/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use rivet_tunnel_protocol::{
ToServerWebSocketOpen,
};
use rivet_util::serde::HashableMap;
use std::time::Duration;
use tokio_tungstenite::tungstenite::Message;

use crate::shared_state::{SharedState, TunnelMessageData};
Expand Down
4 changes: 0 additions & 4 deletions packages/services/pegboard/src/ops/actor/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ async fn forward_to_datacenter(
.next()
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Generate a new actor ID with the correct datacenter label
let actor_id = Id::new_v1(datacenter_label);

// Make request to remote datacenter
let response = request_remote_datacenter::<rivet_api_types::actors::create::CreateResponse>(
ctx.config(),
Expand All @@ -147,7 +144,6 @@ async fn forward_to_datacenter(
namespace: namespace.name.clone(),
}),
Some(&rivet_api_types::actors::create::CreateRequest {
actor_id,
name,
key,
input,
Expand Down
4 changes: 3 additions & 1 deletion packages/services/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ async fn allocate_actor(
.udb()?
.run(|tx| async move {
let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS;
let tx = tx.with_subspace(keys::subspace());

// Check if runner is an serverless runner
let for_serverless = tx
.with_subspace(namespace::keys::subspace())
.exists(
&namespace::keys::RunnerConfigByVariantKey::new(
namespace_id,
Expand All @@ -120,6 +120,8 @@ async fn allocate_actor(
)
.await?;

let tx = tx.with_subspace(keys::subspace());

if for_serverless {
tx.atomic_op(
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(
Expand Down
Loading