Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions out/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions packages/common/api-builder/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ impl ApiCtx {
})
}

pub fn new_from_activity(ctx: &ActivityCtx) -> Result<Self> {
let req_id = Id::new_v1(ctx.config().dc_label());

Ok(Self {
ray_id: ctx.ray_id(),
req_id,
standalone_ctx: StandaloneCtx::new_from_activity(ctx, req_id)?,
})
}

pub fn new_from_operation(ctx: &OperationCtx) -> Result<Self> {
let req_id = Id::new_v1(ctx.config().dc_label());

Ok(Self {
ray_id: ctx.ray_id(),
req_id,
standalone_ctx: StandaloneCtx::new_from_operation(ctx, req_id)?,
})
}

pub fn ray_id(&self) -> Id {
self.ray_id
}
Expand Down
4 changes: 4 additions & 0 deletions packages/common/gasoline/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ impl ActivityCtx {
}

impl ActivityCtx {
pub(crate) fn db(&self) -> &DatabaseHandle {
&self.db
}

pub fn name(&self) -> &str {
self.name
}
Expand Down
4 changes: 4 additions & 0 deletions packages/common/gasoline/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ impl OperationCtx {
}

impl OperationCtx {
pub(crate) fn db(&self) -> &DatabaseHandle {
&self.db
}

pub fn name(&self) -> &str {
self.name
}
Expand Down
40 changes: 31 additions & 9 deletions packages/common/gasoline/core/src/ctx/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::Instrument;

use crate::{
builder::{WorkflowRepr, common as builder},
ctx::{MessageCtx, common, message::SubscriptionHandle},
ctx::{ActivityCtx, MessageCtx, OperationCtx, common, message::SubscriptionHandle},
db::{DatabaseHandle, WorkflowData},
error::WorkflowResult,
message::Message,
Expand All @@ -21,7 +21,7 @@ use crate::{
pub struct StandaloneCtx {
ray_id: Id,
req_id: Id,
name: &'static str,
name: String,
ts: i64,

db: DatabaseHandle,
Expand All @@ -39,7 +39,7 @@ impl StandaloneCtx {
config: rivet_config::Config,
pools: rivet_pools::Pools,
cache: rivet_cache::Cache,
name: &'static str,
name: &str,
ray_id: Id,
req_id: Id,
) -> WorkflowResult<Self> {
Expand All @@ -54,7 +54,7 @@ impl StandaloneCtx {
Ok(StandaloneCtx {
ray_id,
req_id,
name,
name: name.to_string(),
ts,
db,
config,
Expand All @@ -63,6 +63,32 @@ impl StandaloneCtx {
msg_ctx,
})
}

#[tracing::instrument(skip_all)]
pub fn new_from_activity(ctx: &ActivityCtx, req_id: Id) -> WorkflowResult<Self> {
StandaloneCtx::new(
ctx.db().clone(),
ctx.config().clone(),
ctx.pools().clone(),
ctx.cache().clone(),
ctx.name(),
ctx.ray_id(),
req_id,
)
}

#[tracing::instrument(skip_all)]
pub fn new_from_operation(ctx: &OperationCtx, req_id: Id) -> WorkflowResult<Self> {
StandaloneCtx::new(
ctx.db().clone(),
ctx.config().clone(),
ctx.pools().clone(),
ctx.cache().clone(),
ctx.name(),
ctx.ray_id(),
req_id,
)
}
}

impl StandaloneCtx {
Expand Down Expand Up @@ -154,13 +180,9 @@ impl StandaloneCtx {

impl StandaloneCtx {
pub fn name(&self) -> &str {
self.name
&self.name
}

// pub fn timeout(&self) -> Duration {
// self.timeout
// }

pub fn ray_id(&self) -> Id {
self.ray_id
}
Expand Down
24 changes: 16 additions & 8 deletions packages/core/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
use std::result::Result::Ok as ResultOk;
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};

use anyhow::*;
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -108,9 +118,9 @@ impl PegboardGateway {
let actor_id = req
.headers()
.get("x-rivet-actor")
.ok_or_else(|| anyhow!("missing x-rivet-actor header"))?
.context("missing x-rivet-actor header")?
.to_str()
.map_err(|_| anyhow!("invalid x-rivet-actor header"))?
.context("invalid x-rivet-actor header")?
.to_string();

// Extract request parts
Expand All @@ -132,7 +142,7 @@ impl PegboardGateway {
.into_body()
.collect()
.await
.map_err(|e| anyhow!("failed to read body: {}", e))?
.context("failed to read body")?
.to_bytes();

// Build subject to publish to
Expand Down Expand Up @@ -212,11 +222,9 @@ impl PegboardGateway {
// Extract actor ID for the message
let actor_id = match headers
.get("x-rivet-actor")
.ok_or_else(|| anyhow!("missing x-rivet-actor header"))
.and_then(|v| {
v.to_str()
.map_err(|_| anyhow!("invalid x-rivet-actor header"))
}) {
.context("missing x-rivet-actor header")
.and_then(|v| v.to_str().context("invalid x-rivet-actor header"))
{
Result::Ok(v) => v.to_string(),
Err(err) => return Err((client_ws, err)),
};
Expand Down
24 changes: 19 additions & 5 deletions packages/services/epoxy/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use epoxy_protocol::{
versioned,
};
use futures_util::{StreamExt, stream::FuturesUnordered};
use rivet_api_builder::ApiCtx;
use std::future::Future;
use versioned_data_util::OwnedVersionedData;

Expand Down Expand Up @@ -56,7 +57,7 @@ where
.await;
tracing::info!(?quorum_size, len = ?responses.len(), ?quorum_type, "fanout quorum size");

// Choow how many successful responses we need before considering a success
// Choose how many successful responses we need before considering a success
let target_responses = match quorum_type {
// Only require 1 response
utils::QuorumType::Any => 1,
Expand Down Expand Up @@ -93,19 +94,32 @@ where
}

pub async fn send_message(
ctx: &ApiCtx,
config: &protocol::ClusterConfig,
to_replica_id: ReplicaId,
request: protocol::Request,
) -> Result<protocol::Response> {
let replica_url = find_replica_address(config, to_replica_id)?;
send_message_to_address(replica_url, to_replica_id, request).await
let replica_url = find_replica_address(config, request.to_replica_id)?;
send_message_to_address(ctx, replica_url, request).await
}

pub async fn send_message_to_address(
ctx: &ApiCtx,
replica_url: String,
to_replica_id: ReplicaId,
request: protocol::Request,
) -> Result<protocol::Response> {
let from_replica_id = request.from_replica_id;
let to_replica_id = request.to_replica_id;

if from_replica_id == to_replica_id {
tracing::info!(
to_replica = to_replica_id,
"sending message to replica directly"
);

return crate::replica::message_request::message_request(&ctx, from_replica_id, request)
.await;
}

let mut replica_url = url::Url::parse(&replica_url)?;
replica_url.set_path(&format!("/v{PROTOCOL_VERSION}/epoxy/message"));

Expand Down
15 changes: 12 additions & 3 deletions packages/services/epoxy/src/ops/explicit_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::*;
use epoxy_protocol::protocol::{self, ReplicaId};
use gas::prelude::*;
use rivet_api_builder::ApiCtx;

use crate::{http_client, replica, types, utils};

Expand Down Expand Up @@ -48,8 +49,15 @@ pub async fn epoxy_explicit_prepare(
let quorum_members = utils::get_quorum_members(&config);

// EPaxos Step 26: Send Prepare to all replicas and wait for quorum
let prepare_responses =
send_prepares(&config, replica_id, &quorum_members, &new_ballot, instance).await?;
let prepare_responses = send_prepares(
ctx,
&config,
replica_id,
&quorum_members,
&new_ballot,
instance,
)
.await?;

// Check if we got enough responses for a quorum
let required_quorum = utils::calculate_quorum(quorum_members.len(), utils::QuorumType::Slow);
Expand Down Expand Up @@ -241,6 +249,7 @@ fn compare_ballots(a: &protocol::Ballot, b: &protocol::Ballot) -> std::cmp::Orde
}

async fn send_prepares(
ctx: &OperationCtx,
config: &protocol::ClusterConfig,
from_replica_id: ReplicaId,
replica_ids: &[ReplicaId],
Expand All @@ -257,8 +266,8 @@ async fn send_prepares(
let instance = instance.clone();
async move {
let response = http_client::send_message(
&ApiCtx::new_from_operation(&ctx)?,
&config,
to_replica_id,
protocol::Request {
from_replica_id,
to_replica_id,
Expand Down
7 changes: 5 additions & 2 deletions packages/services/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::*;
use epoxy_protocol::protocol::{self, ReplicaId};
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use universaldb::utils::{FormalKey, IsolationLevel::*};

use crate::{http_client, keys, utils};
Expand Down Expand Up @@ -35,7 +36,7 @@ pub struct Output {
///
/// We cannot use quorum reads for the fanout read because of the constraints of Epaxos.
#[operation]
pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
pub async fn epoxy_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
// Try to read locally
let kv_key = keys::keys::KvValueKey::new(input.key.clone());
let cache_key = keys::keys::KvOptimisticCacheKey::new(input.key.clone());
Expand Down Expand Up @@ -113,7 +114,9 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
};

// Send the message and extract the KV response
let response = http_client::send_message(&config, replica_id, request).await?;
let response =
http_client::send_message(&ApiCtx::new_from_operation(&ctx)?, &config, request)
.await?;

match response.kind {
protocol::ResponseKind::KvGetResponse(kv_response) => Ok(kv_response.value),
Expand Down
Loading
Loading