Skip to content

Commit aa77b04

Browse files
committed
ref(relay): De-duplicate envelope forwarding to the objectstore
1 parent 89efe07 commit aa77b04

File tree

4 files changed

+47
-61
lines changed

4 files changed

+47
-61
lines changed

relay-server/src/processing/errors/mod.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -345,22 +345,7 @@ impl Forward for ErrorOutput {
345345
) -> Result<(), Rejected<()>> {
346346
let envelope = self.serialize_envelope(ctx)?;
347347
let envelope = ManagedEnvelope::from(envelope).into_processed();
348-
349-
let has_attachments = envelope
350-
.envelope()
351-
.items()
352-
.any(|item| item.ty() == &ItemType::Attachment);
353-
let use_objectstore = || {
354-
let options = &ctx.global_config.options;
355-
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
356-
};
357-
358-
if has_attachments && use_objectstore() {
359-
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
360-
} else {
361-
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
362-
}
363-
348+
processing::utils::store::forward_envelope(envelope, s, ctx.global_config);
364349
Ok(())
365350
}
366351
}

relay-server/src/processing/transactions/types/output.rs

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@ use relay_protocol::Annotated;
55
use relay_quotas::DataCategory;
66

77
use crate::Envelope;
8-
#[cfg(feature = "processing")]
9-
use crate::managed::ManagedEnvelope;
108
use crate::managed::{Managed, ManagedResult, Rejected};
11-
#[cfg(feature = "processing")]
12-
use crate::processing::StoreHandle;
139
use crate::processing::spans::Indexed;
1410
use crate::processing::transactions::types::{ExpandedTransaction, StandaloneProfile};
11+
#[cfg(feature = "processing")]
12+
use crate::processing::{self, StoreHandle};
1513
use crate::processing::{Forward, ForwardContext};
1614
use crate::services::outcome::{DiscardReason, Outcome};
1715

@@ -72,26 +70,9 @@ impl Forward for TransactionOutput {
7270
s: StoreHandle<'_>,
7371
ctx: ForwardContext<'_>,
7472
) -> Result<(), Rejected<()>> {
75-
use crate::envelope::ItemType;
76-
7773
let envelope = self.serialize_envelope(ctx)?;
78-
let envelope = ManagedEnvelope::from(envelope).into_processed();
79-
80-
let has_attachments = envelope
81-
.envelope()
82-
.items()
83-
.any(|item| item.ty() == &ItemType::Attachment);
84-
let use_objectstore = || {
85-
let options = &ctx.global_config.options;
86-
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
87-
};
88-
89-
if has_attachments && use_objectstore() {
90-
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
91-
} else {
92-
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
93-
}
94-
74+
let envelope = crate::managed::ManagedEnvelope::from(envelope).into_processed();
75+
processing::utils::store::forward_envelope(envelope, s, ctx.global_config);
9576
Ok(())
9677
}
9778
}

relay-server/src/processing/utils/store.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::collections::HashMap;
33

44
use chrono::Utc;
55
use relay_conventions::CLIENT_SAMPLE_RATE;
6+
use relay_dynamic_config::GlobalConfig;
67
use relay_event_schema::protocol::Attributes;
78
use relay_protocol::{Annotated, IntoValue, MetaTree, Value};
89

@@ -11,7 +12,10 @@ use sentry_protos::snuba::v1::{AnyValue, ArrayValue, CategoryCount, Outcomes, an
1112
use serde::Serialize;
1213
use uuid::Uuid;
1314

14-
use crate::managed::Quantities;
15+
use crate::envelope::ItemType;
16+
use crate::managed::{Quantities, TypedEnvelope};
17+
use crate::processing;
18+
use crate::services::processor::Processed;
1519

1620
/// Represents metadata extracted from Relay's annotated model.
1721
///
@@ -240,3 +244,29 @@ pub fn quantities_to_trace_item_outcomes(q: Quantities, scoping: Scoping) -> Out
240244
key_id: scoping.key_id.unwrap_or(0),
241245
}
242246
}
247+
248+
/// Helper which forwards envelopes to be stored.
249+
///
250+
/// This manages the dispatch logic between object store and Kafka. This is a temporary measure
251+
/// until objectstore is available and used in all environments, including Self Hosted.
252+
pub fn forward_envelope(
253+
envelope: TypedEnvelope<Processed>,
254+
s: processing::StoreHandle<'_>,
255+
global_config: &GlobalConfig,
256+
) {
257+
let has_attachments = envelope
258+
.envelope()
259+
.items()
260+
.any(|item| item.ty() == &ItemType::Attachment);
261+
262+
let use_objectstore = || {
263+
crate::utils::sample(global_config.options.objectstore_attachments_sample_rate).is_keep()
264+
};
265+
266+
let message = crate::services::store::StoreEnvelope { envelope };
267+
if has_attachments && use_objectstore() {
268+
s.send_to_objectstore(message);
269+
} else {
270+
s.send_to_store(message);
271+
}
272+
}

relay-server/src/services/processor.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ use relay_threading::AsyncPool;
7878
#[cfg(feature = "processing")]
7979
use {
8080
crate::services::objectstore::Objectstore,
81-
crate::services::store::{Store, StoreEnvelope},
81+
crate::services::store::Store,
8282
crate::utils::Enforcement,
8383
itertools::Itertools,
8484
relay_cardinality::{
@@ -2039,30 +2039,20 @@ impl EnvelopeProcessorService {
20392039
use crate::processing::StoreHandle;
20402040

20412041
let objectstore = self.inner.addrs.objectstore.as_ref();
2042+
let handle = StoreHandle::new(store_forwarder, objectstore);
2043+
20422044
match submit {
20432045
Submit::Envelope(envelope) => {
2044-
let envelope_has_attachments = envelope
2045-
.envelope()
2046-
.items()
2047-
.any(|item| *item.ty() == ItemType::Attachment);
2048-
// Whether Relay will store this attachment in objectstore or use kafka like before.
2049-
let use_objectstore = || {
2050-
let options = &self.inner.global_config.current().options;
2051-
utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2052-
};
2053-
2054-
if let Some(objectstore) = &self.inner.addrs.objectstore
2055-
&& envelope_has_attachments
2056-
&& use_objectstore()
2057-
{
2058-
// the `ObjectstoreService` will upload all attachments, and then forward the envelope to the `StoreService`.
2059-
objectstore.send(StoreEnvelope { envelope })
2060-
} else {
2061-
store_forwarder.send(StoreEnvelope { envelope })
2062-
}
2046+
let global_config = &self.inner.global_config.current();
2047+
// Once check-ins and errors are fully moved to the new pipeline, this is only
2048+
// used for metrics forwarding.
2049+
//
2050+
// Metrics forwarding will n_never_ forward an envelope in processing, making
2051+
// this branch here unused.
2052+
processing::utils::store::forward_envelope(envelope, handle, global_config);
20632053
}
20642054
Submit::Output { output, ctx } => output
2065-
.forward_store(StoreHandle::new(store_forwarder, objectstore), ctx)
2055+
.forward_store(handle, ctx)
20662056
.unwrap_or_else(|err| err.into_inner()),
20672057
}
20682058
return;

0 commit comments

Comments
 (0)