Skip to content

Commit e67a469

Browse files
committed
fix(epoxy): fix sending msgs to self
1 parent 808ae80 commit e67a469

File tree

15 files changed

+151
-57
lines changed

15 files changed

+151
-57
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

out/openapi.json

Lines changed: 1 addition & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/common/api-builder/src/context.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,26 @@ impl ApiCtx {
4949
})
5050
}
5151

52+
pub fn new_from_activity(ctx: &ActivityCtx) -> Result<Self> {
53+
let req_id = Id::new_v1(ctx.config().dc_label());
54+
55+
Ok(Self {
56+
ray_id: ctx.ray_id(),
57+
req_id,
58+
standalone_ctx: StandaloneCtx::new_from_activity(ctx, req_id)?,
59+
})
60+
}
61+
62+
pub fn new_from_operation(ctx: &OperationCtx) -> Result<Self> {
63+
let req_id = Id::new_v1(ctx.config().dc_label());
64+
65+
Ok(Self {
66+
ray_id: ctx.ray_id(),
67+
req_id,
68+
standalone_ctx: StandaloneCtx::new_from_operation(ctx, req_id)?,
69+
})
70+
}
71+
5272
pub fn ray_id(&self) -> Id {
5373
self.ray_id
5474
}

packages/common/gasoline/core/src/ctx/activity.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ impl ActivityCtx {
144144
}
145145

146146
impl ActivityCtx {
147+
pub(crate) fn db(&self) -> &DatabaseHandle {
148+
&self.db
149+
}
150+
147151
pub fn name(&self) -> &str {
148152
self.name
149153
}

packages/common/gasoline/core/src/ctx/operation.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ impl OperationCtx {
152152
}
153153

154154
impl OperationCtx {
155+
pub(crate) fn db(&self) -> &DatabaseHandle {
156+
&self.db
157+
}
158+
155159
pub fn name(&self) -> &str {
156160
self.name
157161
}

packages/common/gasoline/core/src/ctx/standalone.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tracing::Instrument;
77

88
use crate::{
99
builder::{WorkflowRepr, common as builder},
10-
ctx::{MessageCtx, common, message::SubscriptionHandle},
10+
ctx::{ActivityCtx, MessageCtx, OperationCtx, common, message::SubscriptionHandle},
1111
db::{DatabaseHandle, WorkflowData},
1212
error::WorkflowResult,
1313
message::Message,
@@ -21,7 +21,7 @@ use crate::{
2121
pub struct StandaloneCtx {
2222
ray_id: Id,
2323
req_id: Id,
24-
name: &'static str,
24+
name: String,
2525
ts: i64,
2626

2727
db: DatabaseHandle,
@@ -39,7 +39,7 @@ impl StandaloneCtx {
3939
config: rivet_config::Config,
4040
pools: rivet_pools::Pools,
4141
cache: rivet_cache::Cache,
42-
name: &'static str,
42+
name: &str,
4343
ray_id: Id,
4444
req_id: Id,
4545
) -> WorkflowResult<Self> {
@@ -54,7 +54,7 @@ impl StandaloneCtx {
5454
Ok(StandaloneCtx {
5555
ray_id,
5656
req_id,
57-
name,
57+
name: name.to_string(),
5858
ts,
5959
db,
6060
config,
@@ -63,6 +63,32 @@ impl StandaloneCtx {
6363
msg_ctx,
6464
})
6565
}
66+
67+
#[tracing::instrument(skip_all)]
68+
pub fn new_from_activity(ctx: &ActivityCtx, req_id: Id) -> WorkflowResult<Self> {
69+
StandaloneCtx::new(
70+
ctx.db().clone(),
71+
ctx.config().clone(),
72+
ctx.pools().clone(),
73+
ctx.cache().clone(),
74+
ctx.name(),
75+
ctx.ray_id(),
76+
req_id,
77+
)
78+
}
79+
80+
#[tracing::instrument(skip_all)]
81+
pub fn new_from_operation(ctx: &OperationCtx, req_id: Id) -> WorkflowResult<Self> {
82+
StandaloneCtx::new(
83+
ctx.db().clone(),
84+
ctx.config().clone(),
85+
ctx.pools().clone(),
86+
ctx.cache().clone(),
87+
ctx.name(),
88+
ctx.ray_id(),
89+
req_id,
90+
)
91+
}
6692
}
6793

6894
impl StandaloneCtx {
@@ -154,13 +180,9 @@ impl StandaloneCtx {
154180

155181
impl StandaloneCtx {
156182
pub fn name(&self) -> &str {
157-
self.name
183+
&self.name
158184
}
159185

160-
// pub fn timeout(&self) -> Duration {
161-
// self.timeout
162-
// }
163-
164186
pub fn ray_id(&self) -> Id {
165187
self.ray_id
166188
}

packages/core/pegboard-gateway/src/lib.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
use std::result::Result::Ok as ResultOk;
2+
use std::{
3+
collections::HashMap,
4+
sync::{
5+
Arc,
6+
atomic::{AtomicU64, Ordering},
7+
},
8+
time::Duration,
9+
};
10+
111
use anyhow::*;
212
use async_trait::async_trait;
313
use bytes::Bytes;
@@ -108,9 +118,9 @@ impl PegboardGateway {
108118
let actor_id = req
109119
.headers()
110120
.get("x-rivet-actor")
111-
.ok_or_else(|| anyhow!("missing x-rivet-actor header"))?
121+
.context("missing x-rivet-actor header")?
112122
.to_str()
113-
.map_err(|_| anyhow!("invalid x-rivet-actor header"))?
123+
.context("invalid x-rivet-actor header")?
114124
.to_string();
115125

116126
// Extract request parts
@@ -132,7 +142,7 @@ impl PegboardGateway {
132142
.into_body()
133143
.collect()
134144
.await
135-
.map_err(|e| anyhow!("failed to read body: {}", e))?
145+
.context("failed to read body")?
136146
.to_bytes();
137147

138148
// Build subject to publish to
@@ -212,11 +222,9 @@ impl PegboardGateway {
212222
// Extract actor ID for the message
213223
let actor_id = match headers
214224
.get("x-rivet-actor")
215-
.ok_or_else(|| anyhow!("missing x-rivet-actor header"))
216-
.and_then(|v| {
217-
v.to_str()
218-
.map_err(|_| anyhow!("invalid x-rivet-actor header"))
219-
}) {
225+
.context("missing x-rivet-actor header")
226+
.and_then(|v| v.to_str().context("invalid x-rivet-actor header"))
227+
{
220228
Result::Ok(v) => v.to_string(),
221229
Err(err) => return Err((client_ws, err)),
222230
};

packages/services/epoxy/src/http_client.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use epoxy_protocol::{
55
versioned,
66
};
77
use futures_util::{StreamExt, stream::FuturesUnordered};
8+
use rivet_api_builder::ApiCtx;
89
use std::future::Future;
910
use versioned_data_util::OwnedVersionedData;
1011

@@ -56,7 +57,7 @@ where
5657
.await;
5758
tracing::info!(?quorum_size, len = ?responses.len(), ?quorum_type, "fanout quorum size");
5859

59-
// Choow how many successful responses we need before considering a success
60+
// Choose how many successful responses we need before considering a success
6061
let target_responses = match quorum_type {
6162
// Only require 1 response
6263
utils::QuorumType::Any => 1,
@@ -93,19 +94,32 @@ where
9394
}
9495

9596
pub async fn send_message(
97+
ctx: &ApiCtx,
9698
config: &protocol::ClusterConfig,
97-
to_replica_id: ReplicaId,
9899
request: protocol::Request,
99100
) -> Result<protocol::Response> {
100-
let replica_url = find_replica_address(config, to_replica_id)?;
101-
send_message_to_address(replica_url, to_replica_id, request).await
101+
let replica_url = find_replica_address(config, request.to_replica_id)?;
102+
send_message_to_address(ctx, replica_url, request).await
102103
}
103104

104105
pub async fn send_message_to_address(
106+
ctx: &ApiCtx,
105107
replica_url: String,
106-
to_replica_id: ReplicaId,
107108
request: protocol::Request,
108109
) -> Result<protocol::Response> {
110+
let from_replica_id = request.from_replica_id;
111+
let to_replica_id = request.to_replica_id;
112+
113+
if from_replica_id == to_replica_id {
114+
tracing::info!(
115+
to_replica = to_replica_id,
116+
"sending message to replica directly"
117+
);
118+
119+
return crate::replica::message_request::message_request(&ctx, from_replica_id, request)
120+
.await;
121+
}
122+
109123
let mut replica_url = url::Url::parse(&replica_url)?;
110124
replica_url.set_path(&format!("/v{PROTOCOL_VERSION}/epoxy/message"));
111125

packages/services/epoxy/src/ops/explicit_prepare.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::*;
22
use epoxy_protocol::protocol::{self, ReplicaId};
33
use gas::prelude::*;
4+
use rivet_api_builder::ApiCtx;
45

56
use crate::{http_client, replica, types, utils};
67

@@ -48,8 +49,15 @@ pub async fn epoxy_explicit_prepare(
4849
let quorum_members = utils::get_quorum_members(&config);
4950

5051
// EPaxos Step 26: Send Prepare to all replicas and wait for quorum
51-
let prepare_responses =
52-
send_prepares(&config, replica_id, &quorum_members, &new_ballot, instance).await?;
52+
let prepare_responses = send_prepares(
53+
ctx,
54+
&config,
55+
replica_id,
56+
&quorum_members,
57+
&new_ballot,
58+
instance,
59+
)
60+
.await?;
5361

5462
// Check if we got enough responses for a quorum
5563
let required_quorum = utils::calculate_quorum(quorum_members.len(), utils::QuorumType::Slow);
@@ -241,6 +249,7 @@ fn compare_ballots(a: &protocol::Ballot, b: &protocol::Ballot) -> std::cmp::Orde
241249
}
242250

243251
async fn send_prepares(
252+
ctx: &OperationCtx,
244253
config: &protocol::ClusterConfig,
245254
from_replica_id: ReplicaId,
246255
replica_ids: &[ReplicaId],
@@ -257,8 +266,8 @@ async fn send_prepares(
257266
let instance = instance.clone();
258267
async move {
259268
let response = http_client::send_message(
269+
&ApiCtx::new_from_operation(&ctx)?,
260270
&config,
261-
to_replica_id,
262271
protocol::Request {
263272
from_replica_id,
264273
to_replica_id,

packages/services/epoxy/src/ops/kv/get_optimistic.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::*;
22
use epoxy_protocol::protocol::{self, ReplicaId};
33
use gas::prelude::*;
4+
use rivet_api_builder::ApiCtx;
45
use universaldb::utils::{FormalKey, IsolationLevel::*};
56

67
use crate::{http_client, keys, utils};
@@ -35,7 +36,7 @@ pub struct Output {
3536
///
3637
/// We cannot use quorum reads for the fanout read because of the constraints of Epaxos.
3738
#[operation]
38-
pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
39+
pub async fn epoxy_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
3940
// Try to read locally
4041
let kv_key = keys::keys::KvValueKey::new(input.key.clone());
4142
let cache_key = keys::keys::KvOptimisticCacheKey::new(input.key.clone());
@@ -113,7 +114,9 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
113114
};
114115

115116
// Send the message and extract the KV response
116-
let response = http_client::send_message(&config, replica_id, request).await?;
117+
let response =
118+
http_client::send_message(&ApiCtx::new_from_operation(&ctx)?, &config, request)
119+
.await?;
117120

118121
match response.kind {
119122
protocol::ResponseKind::KvGetResponse(kv_response) => Ok(kv_response.value),

0 commit comments

Comments
 (0)