Skip to content

Commit 788b311

Browse files
authored
ref(relay): De-duplicate envelope forwarding to the objectstore (#5716)
1 parent 6ce5288 commit 788b311

File tree

8 files changed

+66
-70
lines changed

8 files changed

+66
-70
lines changed

relay-server/src/managed/envelope.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ impl<G> TypedEnvelope<G> {
9999
self.0.accept()
100100
}
101101

102+
/// Returns the raw [`ManagedEnvelope`].
103+
pub fn into_inner(self) -> ManagedEnvelope {
104+
self.0
105+
}
106+
102107
/// Creates a new typed envelope.
103108
///
104109
/// Note: this method is private to make sure that only `TryFrom` implementation is used, which

relay-server/src/processing/check_ins/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl Forward for CheckInsOutput {
113113
ctx: processing::ForwardContext<'_>,
114114
) -> Result<(), Rejected<()>> {
115115
let envelope = self.serialize_envelope(ctx)?;
116-
let envelope = ManagedEnvelope::from(envelope).into_processed();
116+
let envelope = ManagedEnvelope::from(envelope);
117117

118118
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
119119

relay-server/src/processing/errors/mod.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -341,23 +341,7 @@ impl Forward for ErrorOutput {
341341
ctx: processing::ForwardContext<'_>,
342342
) -> Result<(), Rejected<()>> {
343343
let envelope = self.serialize_envelope(ctx)?;
344-
let envelope = ManagedEnvelope::from(envelope).into_processed();
345-
346-
let has_attachments = envelope
347-
.envelope()
348-
.items()
349-
.any(|item| item.ty() == &ItemType::Attachment);
350-
let use_objectstore = || {
351-
let options = &ctx.global_config.options;
352-
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
353-
};
354-
355-
if has_attachments && use_objectstore() {
356-
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
357-
} else {
358-
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
359-
}
360-
344+
s.send_envelope(ManagedEnvelope::from(envelope));
361345
Ok(())
362346
}
363347
}

relay-server/src/processing/forward.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use relay_dynamic_config::{RetentionConfig, RetentionsConfig};
55
use relay_system::{Addr, FromMessage};
66

77
use crate::Envelope;
8+
#[cfg(feature = "processing")]
9+
use crate::managed::ManagedEnvelope;
810
use crate::managed::{Managed, Rejected};
911
#[cfg(feature = "processing")]
1012
use crate::services::objectstore::Objectstore;
@@ -18,12 +20,21 @@ use crate::services::store::Store;
1820
pub struct StoreHandle<'a> {
1921
store: &'a Addr<Store>,
2022
objectstore: Option<&'a Addr<Objectstore>>,
23+
global_config: &'a GlobalConfig,
2124
}
2225

2326
#[cfg(feature = "processing")]
2427
impl<'a> StoreHandle<'a> {
25-
pub fn new(store: &'a Addr<Store>, objectstore: Option<&'a Addr<Objectstore>>) -> Self {
26-
Self { store, objectstore }
28+
pub fn new(
29+
store: &'a Addr<Store>,
30+
objectstore: Option<&'a Addr<Objectstore>>,
31+
global_config: &'a GlobalConfig,
32+
) -> Self {
33+
Self {
34+
store,
35+
objectstore,
36+
global_config,
37+
}
2738
}
2839

2940
/// Sends a message to the [`Store`] service.
@@ -45,6 +56,36 @@ impl<'a> StoreHandle<'a> {
4556
relay_log::error!("Objectstore service not configured. Dropping message.");
4657
}
4758
}
59+
60+
/// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service.
61+
pub fn send_envelope(&self, envelope: ManagedEnvelope) {
62+
use crate::services::store::StoreEnvelope;
63+
64+
let Some(objectstore) = self.objectstore else {
65+
self.store.send(StoreEnvelope { envelope });
66+
return;
67+
};
68+
69+
let has_attachments = envelope
70+
.envelope()
71+
.items()
72+
.any(|item| item.ty() == &crate::envelope::ItemType::Attachment);
73+
74+
let use_objectstore = || {
75+
crate::utils::sample(
76+
self.global_config
77+
.options
78+
.objectstore_attachments_sample_rate,
79+
)
80+
.is_keep()
81+
};
82+
83+
if has_attachments && use_objectstore() {
84+
objectstore.send(StoreEnvelope { envelope })
85+
} else {
86+
self.store.send(StoreEnvelope { envelope });
87+
}
88+
}
4889
}
4990

5091
/// A processor output which can be forwarded to a different destination.

relay-server/src/processing/transactions/types/output.rs

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ use relay_protocol::Annotated;
55
use relay_quotas::DataCategory;
66

77
use crate::Envelope;
8-
#[cfg(feature = "processing")]
9-
use crate::managed::ManagedEnvelope;
108
use crate::managed::{Managed, ManagedResult, Rejected};
119
#[cfg(feature = "processing")]
1210
use crate::processing::StoreHandle;
@@ -72,26 +70,8 @@ impl Forward for TransactionOutput {
7270
s: StoreHandle<'_>,
7371
ctx: ForwardContext<'_>,
7472
) -> Result<(), Rejected<()>> {
75-
use crate::envelope::ItemType;
76-
7773
let envelope = self.serialize_envelope(ctx)?;
78-
let envelope = ManagedEnvelope::from(envelope).into_processed();
79-
80-
let has_attachments = envelope
81-
.envelope()
82-
.items()
83-
.any(|item| item.ty() == &ItemType::Attachment);
84-
let use_objectstore = || {
85-
let options = &ctx.global_config.options;
86-
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
87-
};
88-
89-
if has_attachments && use_objectstore() {
90-
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
91-
} else {
92-
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
93-
}
94-
74+
s.send_envelope(crate::managed::ManagedEnvelope::from(envelope));
9575
Ok(())
9676
}
9777
}

relay-server/src/services/objectstore.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@ use uuid::Uuid;
1717
use crate::constants::DEFAULT_ATTACHMENT_RETENTION;
1818
use crate::envelope::ItemType;
1919
use crate::managed::{
20-
Counted, Managed, ManagedResult, OutcomeError, Quantities, Rejected, TypedEnvelope,
20+
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
2121
};
2222
use crate::processing::utils::store::item_id_to_uuid;
2323
use crate::services::outcome::DiscardReason;
24-
use crate::services::processor::Processed;
2524
use crate::services::store::{Store, StoreAttachment, StoreEnvelope, StoreTraceItem};
2625
use crate::services::upload;
2726
use crate::statsd::{RelayCounters, RelayTimers};
@@ -271,7 +270,7 @@ impl ObjectstoreServiceInner {
271270
///
272271
/// This mutates the attachment items in-place, setting their `stored_key` field to the key
273272
/// in objectstore.
274-
async fn handle_envelope(&self, mut envelope: TypedEnvelope<Processed>) -> () {
273+
async fn handle_envelope(&self, mut envelope: ManagedEnvelope) -> () {
275274
let scoping = envelope.scoping();
276275
let session = self
277276
.event_attachments

relay-server/src/services/processor.rs

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ use relay_threading::AsyncPool;
7979
#[cfg(feature = "processing")]
8080
use {
8181
crate::services::objectstore::Objectstore,
82-
crate::services::store::{Store, StoreEnvelope},
82+
crate::services::store::Store,
8383
crate::utils::Enforcement,
8484
itertools::Itertools,
8585
relay_cardinality::{
@@ -2064,30 +2064,18 @@ impl EnvelopeProcessorService {
20642064
use crate::processing::StoreHandle;
20652065

20662066
let objectstore = self.inner.addrs.objectstore.as_ref();
2067+
let global_config = &self.inner.global_config.current();
2068+
let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
2069+
20672070
match submit {
2068-
Submit::Envelope(envelope) => {
2069-
let envelope_has_attachments = envelope
2070-
.envelope()
2071-
.items()
2072-
.any(|item| *item.ty() == ItemType::Attachment);
2073-
// Whether Relay will store this attachment in objectstore or use kafka like before.
2074-
let use_objectstore = || {
2075-
let options = &self.inner.global_config.current().options;
2076-
utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2077-
};
2078-
2079-
if let Some(objectstore) = &self.inner.addrs.objectstore
2080-
&& envelope_has_attachments
2081-
&& use_objectstore()
2082-
{
2083-
// the `ObjectstoreService` will upload all attachments, and then forward the envelope to the `StoreService`.
2084-
objectstore.send(StoreEnvelope { envelope })
2085-
} else {
2086-
store_forwarder.send(StoreEnvelope { envelope })
2087-
}
2088-
}
2071+
// Once check-ins and errors are fully moved to the new pipeline, this is only
2072+
// used for metrics forwarding.
2073+
//
2074+
// Metrics forwarding will n_never_ forward an envelope in processing, making
2075+
// this branch here unused.
2076+
Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
20892077
Submit::Output { output, ctx } => output
2090-
.forward_store(StoreHandle::new(store_forwarder, objectstore), ctx)
2078+
.forward_store(handle, ctx)
20912079
.unwrap_or_else(|err| err.into_inner()),
20922080
}
20932081
return;

relay-server/src/services/store.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,11 @@ use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
3434
use relay_threading::AsyncPool;
3535

3636
use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37-
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
37+
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
3838
use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
3939
use crate::service::ServiceError;
4040
use crate::services::global_config::GlobalConfigHandle;
4141
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42-
use crate::services::processor::Processed;
4342
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
4443
use crate::utils::{self, FormDataIter};
4544

@@ -94,7 +93,7 @@ impl Producer {
9493
/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
9594
#[derive(Debug)]
9695
pub struct StoreEnvelope {
97-
pub envelope: TypedEnvelope<Processed>,
96+
pub envelope: ManagedEnvelope,
9897
}
9998

10099
/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.

0 commit comments

Comments
 (0)