diff --git a/packages/services/pegboard/src/workflows/actor/actor_keys.rs b/packages/services/pegboard/src/workflows/actor/actor_keys.rs index d1536726be..ad01e160c5 100644 --- a/packages/services/pegboard/src/workflows/actor/actor_keys.rs +++ b/packages/services/pegboard/src/workflows/actor/actor_keys.rs @@ -6,7 +6,7 @@ use futures_util::TryStreamExt; use gas::prelude::*; use rivet_key_data::converted::ActorByKeyKeyData; use udb_util::prelude::*; -use universaldb::{self as udb, FdbBindingError, options::StreamingMode}; +use universaldb::{self as udb, options::StreamingMode, FdbBindingError}; use crate::keys; @@ -52,7 +52,23 @@ pub async fn reserve_key( .await?; match proposal_result { - ProposalResult::Committed => Ok(ReserveKeyOutput::Success), + ProposalResult::Committed => { + let output = ctx + .activity(ReserveActorKeyInput { + namespace_id, + name: name.clone(), + key: key.clone(), + actor_id, + create_ts: ctx.create_ts(), + }) + .await?; + match output { + ReserveActorKeyOutput::Success => Ok(ReserveKeyOutput::Success), + ReserveActorKeyOutput::ExistingActor { existing_actor_id } => { + Ok(ReserveKeyOutput::KeyExists { existing_actor_id }) + } + } + } ProposalResult::ConsensusFailed => { bail!("consensus failed") } diff --git a/scripts/tests/actor_get_or_create.ts b/scripts/tests/actor_get_or_create.ts index 2b3f34ade4..bfb4260e8f 100755 --- a/scripts/tests/actor_get_or_create.ts +++ b/scripts/tests/actor_get_or_create.ts @@ -7,7 +7,8 @@ import { generateRandomKey, } from "./utils"; -const COUNT = 5; +const COUNT = 50; +const PARALLEL = true; async function main() { try { @@ -21,28 +22,47 @@ async function main() { const sharedKey = generateRandomKey(); console.log(`Using shared key: ${sharedKey}`); - // Create parallel calls to get or create actor with the same key + // Create calls to get or create actor with the same key console.log( - `Creating ${COUNT} parallel get-or-create calls with shared key...`, + `Creating ${COUNT} ${PARALLEL ? "parallel" : "serial"} get-or-create calls with shared key...`, ); let completedCount = 0; - const promises = Array.from({ length: COUNT }, (_, index) => - getOrCreateActorById( - namespaceName, - actorName, - sharedKey, - runnerNameSelector, - ).then((response) => { + + let results; + if (PARALLEL) { + const promises = Array.from({ length: COUNT }, (_, index) => + getOrCreateActorById( + namespaceName, + actorName, + sharedKey, + runnerNameSelector, + ).then((response) => { + completedCount++; + console.log( + `Call ${index + 1}/${COUNT} completed ${JSON.stringify(response)} (${completedCount} total)`, + ); + return { index, response }; + }), + ); + results = await Promise.all(promises); + } else { + results = []; + for (let index = 0; index < COUNT; index++) { + const response = await getOrCreateActorById( + namespaceName, + actorName, + sharedKey, + runnerNameSelector, + ); completedCount++; console.log( `Call ${index + 1}/${COUNT} completed ${JSON.stringify(response)} (${completedCount} total)`, ); - return { index, response }; - }), - ); - - const results = await Promise.all(promises); - console.log(`✓ Completed all ${COUNT} parallel calls`); + results.push({ index, response }); + } + } + + console.log(`✓ Completed all ${COUNT} ${PARALLEL ? "parallel" : "serial"} calls`); // Extract all actor IDs and verify they're all the same const actorIds = results.map((result) => result.response.actor_id);