Skip to content

Commit ee1440a

Browse files
MasterPtatoNathanFlurry
authored andcommitted
fix(api): fix creating actors in other dcs
1 parent eee75ee commit ee1440a

File tree

8 files changed

+55
-70
lines changed

8 files changed

+55
-70
lines changed

out/openapi.json

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/common/api-types/src/actors/create.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ pub struct CreateQuery {
1313
#[serde(deny_unknown_fields)]
1414
#[schema(as = ActorsCreateRequest)]
1515
pub struct CreateRequest {
16-
pub actor_id: Id,
1716
pub name: String,
1817
pub key: Option<String>,
1918
pub input: Option<String>,
Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::Result;
2+
use gas::prelude::*;
23
use rivet_api_builder::ApiCtx;
34
use rivet_api_types::actors::create::{CreateQuery, CreateRequest, CreateResponse};
45

@@ -15,27 +16,24 @@ pub async fn create(
1516
.await?
1617
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
1718

19+
let actor_id = Id::new_v1(ctx.config().dc_label());
20+
1821
let res = ctx
1922
.op(pegboard::ops::actor::create::Input {
20-
actor_id: body.actor_id,
23+
actor_id,
2124
namespace_id: namespace.namespace_id,
2225
name: body.name.clone(),
23-
key: body.key.clone(),
24-
runner_name_selector: body.runner_name_selector.clone(),
26+
key: body.key,
27+
runner_name_selector: body.runner_name_selector,
2528
input: body.input.clone(),
2629
crash_policy: body.crash_policy,
27-
// Don't forward request since this request should be already forwarded if it is going
28-
// to be forward.
29-
//
30-
// This should never throw a request needs to be forwarded error. If it does, something
31-
// is broken.
32-
forward_request: false,
30+
// NOTE: This can forward if the user attempts to create an actor with a target dc and this dc
31+
// ends up forwarding to another.
32+
forward_request: true,
3333
// api-peer is always creating in its own datacenter
3434
datacenter_name: None,
3535
})
3636
.await?;
3737

38-
let actor = res.actor;
39-
40-
Ok(CreateResponse { actor })
38+
Ok(CreateResponse { actor: res.actor })
4139
}
Lines changed: 39 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,24 @@
11
use anyhow::Result;
2-
use rivet_api_builder::ApiCtx;
2+
use axum::{
3+
extract::{Extension, Query},
4+
http::HeaderMap,
5+
response::{IntoResponse, Json, Response},
6+
};
7+
use rivet_api_builder::{ApiCtx, ApiError};
8+
use rivet_api_client::request_remote_datacenter;
9+
use rivet_api_types::actors::create::{CreateRequest, CreateResponse};
310
use rivet_types::actors::CrashPolicy;
4-
use rivet_util::Id;
511
use serde::{Deserialize, Serialize};
612
use utoipa::{IntoParams, ToSchema};
713

8-
#[derive(Debug, Deserialize, IntoParams)]
14+
#[derive(Debug, Serialize, Deserialize, IntoParams)]
915
#[serde(deny_unknown_fields)]
1016
#[into_params(parameter_in = Query)]
1117
pub struct CreateQuery {
1218
pub namespace: String,
1319
pub datacenter: Option<String>,
1420
}
1521

16-
#[derive(Deserialize, ToSchema)]
17-
#[serde(deny_unknown_fields)]
18-
#[schema(as = ActorsCreateRequest)]
19-
pub struct CreateRequest {
20-
pub name: String,
21-
pub key: Option<String>,
22-
pub input: Option<String>,
23-
pub runner_name_selector: String,
24-
pub crash_policy: CrashPolicy,
25-
}
26-
27-
#[derive(Serialize, ToSchema)]
28-
#[schema(as = ActorsCreateResponse)]
29-
pub struct CreateResponse {
30-
pub actor: rivet_types::actors::Actor,
31-
}
32-
3322
/// ## Datacenter Round Trips
3423
///
3524
/// **If actor is created in the current datacenter:**
@@ -57,18 +46,23 @@ pub struct CreateResponse {
5746
),
5847
)]
5948
pub async fn create(
49+
Extension(ctx): Extension<ApiCtx>,
50+
headers: HeaderMap,
51+
Query(query): Query<CreateQuery>,
52+
Json(body): Json<CreateRequest>,
53+
) -> Response {
54+
match create_inner(ctx, headers, query, body).await {
55+
Ok(response) => Json(response).into_response(),
56+
Err(err) => ApiError::from(err).into_response(),
57+
}
58+
}
59+
60+
async fn create_inner(
6061
ctx: ApiCtx,
61-
_path: (),
62+
headers: HeaderMap,
6263
query: CreateQuery,
6364
body: CreateRequest,
6465
) -> Result<CreateResponse> {
65-
let namespace = ctx
66-
.op(namespace::ops::resolve_for_name_global::Input {
67-
name: query.namespace.clone(),
68-
})
69-
.await?
70-
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
71-
7266
// Determine which datacenter to create the actor in
7367
let target_dc_label = if let Some(dc_name) = &query.datacenter {
7468
ctx.config()
@@ -79,26 +73,22 @@ pub async fn create(
7973
ctx.config().dc_label()
8074
};
8175

82-
let actor_id = Id::new_v1(target_dc_label);
83-
84-
let key: Option<String> = body.key;
85-
86-
let res = ctx
87-
.op(pegboard::ops::actor::create::Input {
88-
actor_id,
89-
namespace_id: namespace.namespace_id,
90-
name: body.name.clone(),
91-
key,
92-
runner_name_selector: body.runner_name_selector,
93-
input: body.input.clone(),
94-
crash_policy: body.crash_policy,
95-
// Forward requests to the correct api-peer datacenter
96-
forward_request: true,
97-
datacenter_name: query.datacenter.clone(),
98-
})
99-
.await?;
100-
101-
let actor = res.actor;
76+
let query = rivet_api_types::actors::create::CreateQuery {
77+
namespace: query.namespace,
78+
};
10279

103-
Ok(CreateResponse { actor })
80+
if target_dc_label == ctx.config().dc_label() {
81+
rivet_api_peer::actors::create::create(ctx, (), query, body).await
82+
} else {
83+
request_remote_datacenter::<CreateResponse>(
84+
ctx.config(),
85+
target_dc_label,
86+
"/actors",
87+
axum::http::Method::POST,
88+
headers,
89+
Some(&query),
90+
Some(&body),
91+
)
92+
.await
93+
}
10494
}

packages/core/api-public/src/router.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub async fn router(
6868
)
6969
// MARK: Actors
7070
.route("/actors", axum::routing::get(actors::list::list))
71-
.route("/actors", post(actors::create::create))
71+
.route("/actors", axum::routing::post(actors::create::create))
7272
.route(
7373
"/actors",
7474
axum::routing::put(actors::get_or_create::get_or_create),

packages/core/pegboard-gateway/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use rivet_tunnel_protocol::{
2626
ToServerWebSocketOpen,
2727
};
2828
use rivet_util::serde::HashableMap;
29-
use std::time::Duration;
3029
use tokio_tungstenite::tungstenite::Message;
3130

3231
use crate::shared_state::{SharedState, TunnelMessageData};

packages/services/pegboard/src/ops/actor/create.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,6 @@ async fn forward_to_datacenter(
133133
.next()
134134
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
135135

136-
// Generate a new actor ID with the correct datacenter label
137-
let actor_id = Id::new_v1(datacenter_label);
138-
139136
// Make request to remote datacenter
140137
let response = request_remote_datacenter::<rivet_api_types::actors::create::CreateResponse>(
141138
ctx.config(),
@@ -147,7 +144,6 @@ async fn forward_to_datacenter(
147144
namespace: namespace.name.clone(),
148145
}),
149146
Some(&rivet_api_types::actors::create::CreateRequest {
150-
actor_id,
151147
name,
152148
key,
153149
input,

packages/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ async fn allocate_actor(
106106
.udb()?
107107
.run(|tx| async move {
108108
let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS;
109-
let tx = tx.with_subspace(keys::subspace());
110109

111110
// Check if runner is an serverless runner
112111
let for_serverless = tx
112+
.with_subspace(namespace::keys::subspace())
113113
.exists(
114114
&namespace::keys::RunnerConfigByVariantKey::new(
115115
namespace_id,
@@ -120,6 +120,8 @@ async fn allocate_actor(
120120
)
121121
.await?;
122122

123+
let tx = tx.with_subspace(keys::subspace());
124+
123125
if for_serverless {
124126
tx.atomic_op(
125127
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(

0 commit comments

Comments
 (0)