diff --git a/Cargo.lock b/Cargo.lock index cd762d64fa..21b23c5d82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4122,6 +4122,7 @@ dependencies = [ "rivet-pools", "rivet-util", "serde", + "serde_html_form", "tokio", "tracing", ] diff --git a/out/openapi.json b/out/openapi.json index 637d75cf88..3eeca9e31d 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -1185,8 +1185,7 @@ "type": "object", "required": [ "datacenter_label", - "name", - "url" + "name" ], "properties": { "datacenter_label": { @@ -1196,9 +1195,6 @@ }, "name": { "type": "string" - }, - "url": { - "type": "string" } }, "additionalProperties": false diff --git a/packages/common/api-builder/src/context.rs b/packages/common/api-builder/src/context.rs index 5bfa83625d..7b22e3a5d4 100644 --- a/packages/common/api-builder/src/context.rs +++ b/packages/common/api-builder/src/context.rs @@ -49,6 +49,26 @@ impl ApiCtx { }) } + pub fn new_from_activity(ctx: &ActivityCtx) -> Result { + let req_id = Id::new_v1(ctx.config().dc_label()); + + Ok(Self { + ray_id: ctx.ray_id(), + req_id, + standalone_ctx: StandaloneCtx::new_from_activity(ctx, req_id)?, + }) + } + + pub fn new_from_operation(ctx: &OperationCtx) -> Result { + let req_id = Id::new_v1(ctx.config().dc_label()); + + Ok(Self { + ray_id: ctx.ray_id(), + req_id, + standalone_ctx: StandaloneCtx::new_from_operation(ctx, req_id)?, + }) + } + pub fn ray_id(&self) -> Id { self.ray_id } diff --git a/packages/common/gasoline/core/src/ctx/activity.rs b/packages/common/gasoline/core/src/ctx/activity.rs index 28650d73e8..ce1de28c55 100644 --- a/packages/common/gasoline/core/src/ctx/activity.rs +++ b/packages/common/gasoline/core/src/ctx/activity.rs @@ -144,6 +144,10 @@ impl ActivityCtx { } impl ActivityCtx { + pub(crate) fn db(&self) -> &DatabaseHandle { + &self.db + } + pub fn name(&self) -> &str { self.name } diff --git a/packages/common/gasoline/core/src/ctx/operation.rs b/packages/common/gasoline/core/src/ctx/operation.rs index a9695fec00..99fbd6071f 100644 --- a/packages/common/gasoline/core/src/ctx/operation.rs +++ b/packages/common/gasoline/core/src/ctx/operation.rs @@ -152,6 +152,10 @@ impl OperationCtx { } impl OperationCtx { + pub(crate) fn db(&self) -> &DatabaseHandle { + &self.db + } + pub fn name(&self) -> &str { self.name } diff --git a/packages/common/gasoline/core/src/ctx/standalone.rs b/packages/common/gasoline/core/src/ctx/standalone.rs index 6b5c476e32..089afc7428 100644 --- a/packages/common/gasoline/core/src/ctx/standalone.rs +++ b/packages/common/gasoline/core/src/ctx/standalone.rs @@ -7,7 +7,7 @@ use tracing::Instrument; use crate::{ builder::{WorkflowRepr, common as builder}, - ctx::{MessageCtx, common, message::SubscriptionHandle}, + ctx::{ActivityCtx, MessageCtx, OperationCtx, common, message::SubscriptionHandle}, db::{DatabaseHandle, WorkflowData}, error::WorkflowResult, message::Message, @@ -21,7 +21,7 @@ use crate::{ pub struct StandaloneCtx { ray_id: Id, req_id: Id, - name: &'static str, + name: String, ts: i64, db: DatabaseHandle, @@ -39,7 +39,7 @@ impl StandaloneCtx { config: rivet_config::Config, pools: rivet_pools::Pools, cache: rivet_cache::Cache, - name: &'static str, + name: &str, ray_id: Id, req_id: Id, ) -> WorkflowResult { @@ -54,7 +54,7 @@ impl StandaloneCtx { Ok(StandaloneCtx { ray_id, req_id, - name, + name: name.to_string(), ts, db, config, @@ -63,6 +63,32 @@ impl StandaloneCtx { msg_ctx, }) } + + #[tracing::instrument(skip_all)] + pub fn new_from_activity(ctx: &ActivityCtx, req_id: Id) -> WorkflowResult { + StandaloneCtx::new( + ctx.db().clone(), + ctx.config().clone(), + ctx.pools().clone(), + ctx.cache().clone(), + ctx.name(), + ctx.ray_id(), + req_id, + ) + } + + #[tracing::instrument(skip_all)] + pub fn new_from_operation(ctx: &OperationCtx, req_id: Id) -> WorkflowResult { + StandaloneCtx::new( + ctx.db().clone(), + ctx.config().clone(), + ctx.pools().clone(), + ctx.cache().clone(), + ctx.name(), + ctx.ray_id(), + req_id, + ) + } } impl StandaloneCtx { @@ -154,13 +180,9 @@ impl StandaloneCtx { impl StandaloneCtx { pub fn name(&self) -> &str { - self.name + &self.name } - // pub fn timeout(&self) -> Duration { - // self.timeout - // } - pub fn ray_id(&self) -> Id { self.ray_id } diff --git a/packages/core/pegboard-gateway/src/lib.rs b/packages/core/pegboard-gateway/src/lib.rs index 67a23eff2a..b1b89dd4ed 100644 --- a/packages/core/pegboard-gateway/src/lib.rs +++ b/packages/core/pegboard-gateway/src/lib.rs @@ -1,3 +1,13 @@ +use std::result::Result::Ok as ResultOk; +use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; + use anyhow::*; use async_trait::async_trait; use bytes::Bytes; @@ -108,9 +118,9 @@ impl PegboardGateway { let actor_id = req .headers() .get("x-rivet-actor") - .ok_or_else(|| anyhow!("missing x-rivet-actor header"))? + .context("missing x-rivet-actor header")? .to_str() - .map_err(|_| anyhow!("invalid x-rivet-actor header"))? + .context("invalid x-rivet-actor header")? .to_string(); // Extract request parts @@ -132,7 +142,7 @@ impl PegboardGateway { .into_body() .collect() .await - .map_err(|e| anyhow!("failed to read body: {}", e))? + .context("failed to read body")? .to_bytes(); // Build subject to publish to @@ -212,11 +222,9 @@ impl PegboardGateway { // Extract actor ID for the message let actor_id = match headers .get("x-rivet-actor") - .ok_or_else(|| anyhow!("missing x-rivet-actor header")) - .and_then(|v| { - v.to_str() - .map_err(|_| anyhow!("invalid x-rivet-actor header")) - }) { + .context("missing x-rivet-actor header") + .and_then(|v| v.to_str().context("invalid x-rivet-actor header")) + { Result::Ok(v) => v.to_string(), Err(err) => return Err((client_ws, err)), }; diff --git a/packages/services/epoxy/src/http_client.rs b/packages/services/epoxy/src/http_client.rs index 51864305c6..f3b440a79f 100644 --- a/packages/services/epoxy/src/http_client.rs +++ b/packages/services/epoxy/src/http_client.rs @@ -5,6 +5,7 @@ use epoxy_protocol::{ versioned, }; use futures_util::{StreamExt, stream::FuturesUnordered}; +use rivet_api_builder::ApiCtx; use std::future::Future; use versioned_data_util::OwnedVersionedData; @@ -56,7 +57,7 @@ where .await; tracing::info!(?quorum_size, len = ?responses.len(), ?quorum_type, "fanout quorum size"); - // Choow how many successful responses we need before considering a success + // Choose how many successful responses we need before considering a success let target_responses = match quorum_type { // Only require 1 response utils::QuorumType::Any => 1, @@ -93,19 +94,32 @@ where } pub async fn send_message( + ctx: &ApiCtx, config: &protocol::ClusterConfig, - to_replica_id: ReplicaId, request: protocol::Request, ) -> Result { - let replica_url = find_replica_address(config, to_replica_id)?; - send_message_to_address(replica_url, to_replica_id, request).await + let replica_url = find_replica_address(config, request.to_replica_id)?; + send_message_to_address(ctx, replica_url, request).await } pub async fn send_message_to_address( + ctx: &ApiCtx, replica_url: String, - to_replica_id: ReplicaId, request: protocol::Request, ) -> Result { + let from_replica_id = request.from_replica_id; + let to_replica_id = request.to_replica_id; + + if from_replica_id == to_replica_id { + tracing::info!( + to_replica = to_replica_id, + "sending message to replica directly" + ); + + return crate::replica::message_request::message_request(&ctx, from_replica_id, request) + .await; + } + let mut replica_url = url::Url::parse(&replica_url)?; replica_url.set_path(&format!("/v{PROTOCOL_VERSION}/epoxy/message")); diff --git a/packages/services/epoxy/src/ops/explicit_prepare.rs b/packages/services/epoxy/src/ops/explicit_prepare.rs index 14ca28852a..b75070d3f7 100644 --- a/packages/services/epoxy/src/ops/explicit_prepare.rs +++ b/packages/services/epoxy/src/ops/explicit_prepare.rs @@ -1,6 +1,7 @@ use anyhow::*; use epoxy_protocol::protocol::{self, ReplicaId}; use gas::prelude::*; +use rivet_api_builder::ApiCtx; use crate::{http_client, replica, types, utils}; @@ -48,8 +49,15 @@ pub async fn epoxy_explicit_prepare( let quorum_members = utils::get_quorum_members(&config); // EPaxos Step 26: Send Prepare to all replicas and wait for quorum - let prepare_responses = - send_prepares(&config, replica_id, &quorum_members, &new_ballot, instance).await?; + let prepare_responses = send_prepares( + ctx, + &config, + replica_id, + &quorum_members, + &new_ballot, + instance, + ) + .await?; // Check if we got enough responses for a quorum let required_quorum = utils::calculate_quorum(quorum_members.len(), utils::QuorumType::Slow); @@ -241,6 +249,7 @@ fn compare_ballots(a: &protocol::Ballot, b: &protocol::Ballot) -> std::cmp::Orde } async fn send_prepares( + ctx: &OperationCtx, config: &protocol::ClusterConfig, from_replica_id: ReplicaId, replica_ids: &[ReplicaId], @@ -257,8 +266,8 @@ async fn send_prepares( let instance = instance.clone(); async move { let response = http_client::send_message( + &ApiCtx::new_from_operation(&ctx)?, &config, - to_replica_id, protocol::Request { from_replica_id, to_replica_id, diff --git a/packages/services/epoxy/src/ops/kv/get_optimistic.rs b/packages/services/epoxy/src/ops/kv/get_optimistic.rs index 6335cbbb90..726b54656f 100644 --- a/packages/services/epoxy/src/ops/kv/get_optimistic.rs +++ b/packages/services/epoxy/src/ops/kv/get_optimistic.rs @@ -1,6 +1,7 @@ use anyhow::*; use epoxy_protocol::protocol::{self, ReplicaId}; use gas::prelude::*; +use rivet_api_builder::ApiCtx; use universaldb::utils::{FormalKey, IsolationLevel::*}; use crate::{http_client, keys, utils}; @@ -35,7 +36,7 @@ pub struct Output { /// /// We cannot use quorum reads for the fanout read because of the constraints of Epaxos. #[operation] -pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result { +pub async fn epoxy_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result { // Try to read locally let kv_key = keys::keys::KvValueKey::new(input.key.clone()); let cache_key = keys::keys::KvOptimisticCacheKey::new(input.key.clone()); @@ -113,7 +114,9 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul }; // Send the message and extract the KV response - let response = http_client::send_message(&config, replica_id, request).await?; + let response = + http_client::send_message(&ApiCtx::new_from_operation(&ctx)?, &config, request) + .await?; match response.kind { protocol::ResponseKind::KvGetResponse(kv_response) => Ok(kv_response.value), diff --git a/packages/services/epoxy/src/ops/propose.rs b/packages/services/epoxy/src/ops/propose.rs index 8c42747036..cc8a0fe7bc 100644 --- a/packages/services/epoxy/src/ops/propose.rs +++ b/packages/services/epoxy/src/ops/propose.rs @@ -49,7 +49,8 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result= utils::calculate_quorum(quorum_members.len(), utils::QuorumType::Slow) { @@ -137,12 +145,14 @@ pub async fn commit( // Send commits to all replicas (not just quorum members) let all_replicas = utils::get_all_replicas(config); tokio::spawn({ + let ctx = ctx.clone(); let config = config.clone(); let replica_id = replica_id; let all_replicas = all_replicas.to_vec(); let payload = payload.clone(); + async move { - let _ = send_commits(&config, replica_id, &all_replicas, &payload).await; + let _ = send_commits(&ctx, &config, replica_id, &all_replicas, &payload).await; } }); @@ -154,6 +164,7 @@ pub async fn commit( } async fn send_pre_accepts( + ctx: &OperationCtx, config: &protocol::ClusterConfig, from_replica_id: ReplicaId, replica_ids: &[ReplicaId], @@ -168,8 +179,8 @@ async fn send_pre_accepts( let payload = payload.clone(); async move { let response = http_client::send_message( + &ApiCtx::new_from_operation(&ctx)?, &config, - to_replica_id, protocol::Request { from_replica_id, to_replica_id, @@ -197,6 +208,7 @@ async fn send_pre_accepts( } async fn send_accepts( + ctx: &OperationCtx, config: &protocol::ClusterConfig, from_replica_id: ReplicaId, replica_ids: &[ReplicaId], @@ -211,8 +223,8 @@ async fn send_accepts( let payload = payload.clone(); async move { let response = http_client::send_message( + &ApiCtx::new_from_operation(&ctx)?, &config, - to_replica_id, protocol::Request { from_replica_id, to_replica_id, @@ -241,6 +253,7 @@ async fn send_accepts( } async fn send_commits( + ctx: &OperationCtx, config: &protocol::ClusterConfig, from_replica_id: ReplicaId, replica_ids: &[ReplicaId], @@ -255,8 +268,8 @@ async fn send_commits( let payload = payload.clone(); async move { let response = http_client::send_message( + &ApiCtx::new_from_operation(&ctx)?, &config, - to_replica_id, protocol::Request { from_replica_id, to_replica_id, diff --git a/packages/services/epoxy/src/replica/message_request.rs b/packages/services/epoxy/src/replica/message_request.rs index 44ec707be1..f3ca78426a 100644 --- a/packages/services/epoxy/src/replica/message_request.rs +++ b/packages/services/epoxy/src/replica/message_request.rs @@ -123,7 +123,6 @@ pub async fn message_request( }) .to_workflow::() .tag("replica", replica_id) - .to_workflow::() .send() .await?; diff --git a/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs b/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs index 1c2a4748fd..0adbe0fd6c 100644 --- a/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs +++ b/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs @@ -1,6 +1,7 @@ use anyhow::*; use epoxy_protocol::protocol::{self, ReplicaId}; use gas::prelude::*; +use rivet_api_builder::ApiCtx; use serde::{Deserialize, Serialize}; use crate::types; @@ -160,18 +161,13 @@ pub async fn health_check_new_replicas( kind: protocol::RequestKind::HealthCheckRequest, }; - // Call directly instead of sending request for same replica - if from_replica_id == replica_id { - tracing::info!(?replica_id, "skipping health check to self"); - } else { - crate::http_client::send_message_to_address( - replica.api_peer_url.clone(), - replica_id, - request, - ) - .await - .with_context(|| format!("health check failed for replica {}", replica_id))?; - } + crate::http_client::send_message_to_address( + &ApiCtx::new_from_activity(ctx)?, + replica.api_peer_url.clone(), + request, + ) + .await + .with_context(|| format!("health check failed for replica {}", replica_id))?; tracing::info!(?replica_id, "health check successful"); Ok(()) @@ -243,7 +239,8 @@ pub async fn send_begin_learning( }), }; - crate::http_client::send_message(&config, replica_id, request).await?; + crate::http_client::send_message(&ApiCtx::new_from_activity(&ctx)?, &config, request) + .await?; tracing::info!(?replica_id, "begin learning sent successfully"); Ok(()) diff --git a/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs b/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs index 41da43f595..d35f2c6706 100644 --- a/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs +++ b/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs @@ -1,6 +1,7 @@ use anyhow::*; use epoxy_protocol::protocol; use gas::prelude::*; +use rivet_api_builder::ApiCtx; use serde::{Deserialize, Serialize}; use super::State; @@ -126,7 +127,7 @@ pub async fn notify_all_replicas( }), }; - crate::http_client::send_message(&config, replica_id, request) + crate::http_client::send_message(&ApiCtx::new_from_activity(&ctx)?, &config, request) .await .with_context(|| format!("failed to update config for replica {}", replica_id))?; diff --git a/packages/services/epoxy/src/workflows/replica/setup.rs b/packages/services/epoxy/src/workflows/replica/setup.rs index 31671b7341..8d9b43b2f6 100644 --- a/packages/services/epoxy/src/workflows/replica/setup.rs +++ b/packages/services/epoxy/src/workflows/replica/setup.rs @@ -2,6 +2,7 @@ use anyhow::*; use epoxy_protocol::protocol; use futures_util::{FutureExt, TryStreamExt}; use gas::prelude::*; +use rivet_api_builder::ApiCtx; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use universaldb::prelude::*; @@ -14,10 +15,10 @@ use crate::types; // activities which can cause the learning process to enter an invalid state. pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Result<()> { - // Wait for coordiinator to send begin learning signal + // Wait for cooridinator to send begin learning signal let begin_learning = ctx.listen::().await?; - // TODO: Parallelize replicas + // TODO: Paralellize replicas let total_replicas = begin_learning.config.replicas.len(); let mut replica_index = 0; @@ -182,7 +183,8 @@ pub async fn download_instances_chunk( }; let response = - crate::http_client::send_message(&proto_config, input.from_replica_id, request).await?; + crate::http_client::send_message(&ApiCtx::new_from_activity(&ctx)?, &proto_config, request) + .await?; // Extract instances from response let protocol::ResponseKind::DownloadInstancesResponse(download_response) = response.kind else { @@ -751,7 +753,8 @@ pub async fn notify_active(ctx: &ActivityCtx, input: &NotifyActiveInput) -> Resu ), }; - crate::http_client::send_message(&proto_config, config.coordinator_replica_id, request).await?; + crate::http_client::send_message(&ApiCtx::new_from_activity(&ctx)?, &proto_config, request) + .await?; tracing::info!("notified coordinator of active status"); Ok(())