From aa77b040d34d242a3e9e30db3a642e112a99f2b8 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 11 Mar 2026 16:22:28 +0100 Subject: [PATCH 1/4] ref(relay): De-duplicate envelope forwarding to the objectstore --- relay-server/src/processing/errors/mod.rs | 17 +--------- .../processing/transactions/types/output.rs | 27 +++------------- relay-server/src/processing/utils/store.rs | 32 ++++++++++++++++++- relay-server/src/services/processor.rs | 32 +++++++------------ 4 files changed, 47 insertions(+), 61 deletions(-) diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs index 9763c0157e..118b356ba4 100644 --- a/relay-server/src/processing/errors/mod.rs +++ b/relay-server/src/processing/errors/mod.rs @@ -345,22 +345,7 @@ impl Forward for ErrorOutput { ) -> Result<(), Rejected<()>> { let envelope = self.serialize_envelope(ctx)?; let envelope = ManagedEnvelope::from(envelope).into_processed(); - - let has_attachments = envelope - .envelope() - .items() - .any(|item| item.ty() == &ItemType::Attachment); - let use_objectstore = || { - let options = &ctx.global_config.options; - crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep() - }; - - if has_attachments && use_objectstore() { - s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope }); - } else { - s.send_to_store(crate::services::store::StoreEnvelope { envelope }); - } - + processing::utils::store::forward_envelope(envelope, s, ctx.global_config); Ok(()) } } diff --git a/relay-server/src/processing/transactions/types/output.rs b/relay-server/src/processing/transactions/types/output.rs index a16b53f458..c579b4773b 100644 --- a/relay-server/src/processing/transactions/types/output.rs +++ b/relay-server/src/processing/transactions/types/output.rs @@ -5,13 +5,11 @@ use relay_protocol::Annotated; use relay_quotas::DataCategory; use crate::Envelope; -#[cfg(feature = "processing")] -use crate::managed::ManagedEnvelope; use crate::managed::{Managed, ManagedResult, Rejected}; -#[cfg(feature = "processing")] -use crate::processing::StoreHandle; use crate::processing::spans::Indexed; use crate::processing::transactions::types::{ExpandedTransaction, StandaloneProfile}; +#[cfg(feature = "processing")] +use crate::processing::{self, StoreHandle}; use crate::processing::{Forward, ForwardContext}; use crate::services::outcome::{DiscardReason, Outcome}; @@ -72,26 +70,9 @@ impl Forward for TransactionOutput { s: StoreHandle<'_>, ctx: ForwardContext<'_>, ) -> Result<(), Rejected<()>> { - use crate::envelope::ItemType; - let envelope = self.serialize_envelope(ctx)?; - let envelope = ManagedEnvelope::from(envelope).into_processed(); - - let has_attachments = envelope - .envelope() - .items() - .any(|item| item.ty() == &ItemType::Attachment); - let use_objectstore = || { - let options = &ctx.global_config.options; - crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep() - }; - - if has_attachments && use_objectstore() { - s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope }); - } else { - s.send_to_store(crate::services::store::StoreEnvelope { envelope }); - } - + let envelope = crate::managed::ManagedEnvelope::from(envelope).into_processed(); + processing::utils::store::forward_envelope(envelope, s, ctx.global_config); Ok(()) } } diff --git a/relay-server/src/processing/utils/store.rs b/relay-server/src/processing/utils/store.rs index 3a73872ab2..0421e2947f 100644 --- a/relay-server/src/processing/utils/store.rs +++ b/relay-server/src/processing/utils/store.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use chrono::Utc; use relay_conventions::CLIENT_SAMPLE_RATE; +use relay_dynamic_config::GlobalConfig; use relay_event_schema::protocol::Attributes; use relay_protocol::{Annotated, IntoValue, MetaTree, Value}; @@ -11,7 +12,10 @@ use sentry_protos::snuba::v1::{AnyValue, ArrayValue, CategoryCount, Outcomes, an use serde::Serialize; use uuid::Uuid; -use crate::managed::Quantities; +use crate::envelope::ItemType; +use crate::managed::{Quantities, TypedEnvelope}; +use crate::processing; +use crate::services::processor::Processed; /// Represents metadata extracted from Relay's annotated model. /// @@ -240,3 +244,29 @@ pub fn quantities_to_trace_item_outcomes(q: Quantities, scoping: Scoping) -> Out key_id: scoping.key_id.unwrap_or(0), } } + +/// Helper which forwards envelopes to be stored. +/// +/// This manages the dispatch logic between object store and Kafka. This is a temporary measure +/// until objectstore is available and used in all environments, including Self Hosted. +pub fn forward_envelope( + envelope: TypedEnvelope, + s: processing::StoreHandle<'_>, + global_config: &GlobalConfig, +) { + let has_attachments = envelope + .envelope() + .items() + .any(|item| item.ty() == &ItemType::Attachment); + + let use_objectstore = || { + crate::utils::sample(global_config.options.objectstore_attachments_sample_rate).is_keep() + }; + + let message = crate::services::store::StoreEnvelope { envelope }; + if has_attachments && use_objectstore() { + s.send_to_objectstore(message); + } else { + s.send_to_store(message); + } +} diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index fac1b8320e..6e41d644f9 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -78,7 +78,7 @@ use relay_threading::AsyncPool; #[cfg(feature = "processing")] use { crate::services::objectstore::Objectstore, - crate::services::store::{Store, StoreEnvelope}, + crate::services::store::Store, crate::utils::Enforcement, itertools::Itertools, relay_cardinality::{ @@ -2039,30 +2039,20 @@ impl EnvelopeProcessorService { use crate::processing::StoreHandle; let objectstore = self.inner.addrs.objectstore.as_ref(); + let handle = StoreHandle::new(store_forwarder, objectstore); + match submit { Submit::Envelope(envelope) => { - let envelope_has_attachments = envelope - .envelope() - .items() - .any(|item| *item.ty() == ItemType::Attachment); - // Whether Relay will store this attachment in objectstore or use kafka like before. - let use_objectstore = || { - let options = &self.inner.global_config.current().options; - utils::sample(options.objectstore_attachments_sample_rate).is_keep() - }; - - if let Some(objectstore) = &self.inner.addrs.objectstore - && envelope_has_attachments - && use_objectstore() - { - // the `ObjectstoreService` will upload all attachments, and then forward the envelope to the `StoreService`. - objectstore.send(StoreEnvelope { envelope }) - } else { - store_forwarder.send(StoreEnvelope { envelope }) - } + let global_config = &self.inner.global_config.current(); + // Once check-ins and errors are fully moved to the new pipeline, this is only + // used for metrics forwarding. + // + // Metrics forwarding will n_never_ forward an envelope in processing, making + // this branch here unused. + processing::utils::store::forward_envelope(envelope, handle, global_config); } Submit::Output { output, ctx } => output - .forward_store(StoreHandle::new(store_forwarder, objectstore), ctx) + .forward_store(handle, ctx) .unwrap_or_else(|err| err.into_inner()), } return; From ac46fb72948f30fafaa72139d2902c2c3170f7c1 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 12 Mar 2026 10:20:13 +0100 Subject: [PATCH 2/4] move util fn to store handle --- relay-server/src/managed/envelope.rs | 5 ++ relay-server/src/processing/check_ins/mod.rs | 2 +- relay-server/src/processing/errors/mod.rs | 3 +- relay-server/src/processing/forward.rs | 47 +++++++++++++++++-- .../processing/transactions/types/output.rs | 7 ++- relay-server/src/processing/utils/store.rs | 32 +------------ relay-server/src/services/objectstore.rs | 5 +- relay-server/src/services/processor.rs | 18 ++++--- relay-server/src/services/store.rs | 5 +- 9 files changed, 67 insertions(+), 57 deletions(-) diff --git a/relay-server/src/managed/envelope.rs b/relay-server/src/managed/envelope.rs index c3b2efba1a..93e1028a58 100644 --- a/relay-server/src/managed/envelope.rs +++ b/relay-server/src/managed/envelope.rs @@ -99,6 +99,11 @@ impl TypedEnvelope { self.0.accept() } + /// Returns the raw [`ManagedEnvelope`]. + pub fn into_inner(self) -> ManagedEnvelope { + self.0 + } + /// Creates a new typed envelope. /// /// Note: this method is private to make sure that only `TryFrom` implementation is used, which diff --git a/relay-server/src/processing/check_ins/mod.rs b/relay-server/src/processing/check_ins/mod.rs index 9940e1a10d..7230833ed0 100644 --- a/relay-server/src/processing/check_ins/mod.rs +++ b/relay-server/src/processing/check_ins/mod.rs @@ -116,7 +116,7 @@ impl Forward for CheckInsOutput { ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { let envelope = self.serialize_envelope(ctx)?; - let envelope = ManagedEnvelope::from(envelope).into_processed(); + let envelope = ManagedEnvelope::from(envelope); s.send_to_store(crate::services::store::StoreEnvelope { envelope }); diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs index 118b356ba4..4fa717f98d 100644 --- a/relay-server/src/processing/errors/mod.rs +++ b/relay-server/src/processing/errors/mod.rs @@ -344,8 +344,7 @@ impl Forward for ErrorOutput { ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { let envelope = self.serialize_envelope(ctx)?; - let envelope = ManagedEnvelope::from(envelope).into_processed(); - processing::utils::store::forward_envelope(envelope, s, ctx.global_config); + s.send_envelope(ManagedEnvelope::from(envelope)); Ok(()) } } diff --git a/relay-server/src/processing/forward.rs b/relay-server/src/processing/forward.rs index d5cb4d2fcb..07bb717656 100644 --- a/relay-server/src/processing/forward.rs +++ b/relay-server/src/processing/forward.rs @@ -5,6 +5,8 @@ use relay_dynamic_config::{RetentionConfig, RetentionsConfig}; use relay_system::{Addr, FromMessage}; use crate::Envelope; +#[cfg(feature = "processing")] +use crate::managed::ManagedEnvelope; use crate::managed::{Managed, Rejected}; #[cfg(feature = "processing")] use crate::services::objectstore::Objectstore; @@ -18,12 +20,21 @@ use crate::services::store::Store; pub struct StoreHandle<'a> { store: &'a Addr, objectstore: Option<&'a Addr>, + global_config: &'a GlobalConfig, } #[cfg(feature = "processing")] impl<'a> StoreHandle<'a> { - pub fn new(store: &'a Addr, objectstore: Option<&'a Addr>) -> Self { - Self { store, objectstore } + pub fn new( + store: &'a Addr, + objectstore: Option<&'a Addr>, + global_config: &'a GlobalConfig, + ) -> Self { + Self { + store, + objectstore, + global_config, + } } /// Sends a message to the [`Store`] service. @@ -45,6 +56,36 @@ impl<'a> StoreHandle<'a> { relay_log::error!("Objectstore service not configured. Dropping message."); } } + + /// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service. + pub fn send_envelope(&self, envelope: ManagedEnvelope) { + use crate::services::store::StoreEnvelope; + + let Some(objectstore) = self.objectstore else { + self.store.send(StoreEnvelope { envelope }); + return; + }; + + let has_attachments = envelope + .envelope() + .items() + .any(|item| item.ty() == &crate::envelope::ItemType::Attachment); + + let use_objectstore = || { + crate::utils::sample( + self.global_config + .options + .objectstore_attachments_sample_rate, + ) + .is_keep() + }; + + if has_attachments && use_objectstore() { + objectstore.send(StoreEnvelope { envelope }) + } else { + self.store.send(StoreEnvelope { envelope }); + } + } } /// A processor output which can be forwarded to a different destination. @@ -73,7 +114,7 @@ pub struct ForwardContext<'a> { /// The Relay configuration. pub config: &'a Config, /// A view of the currently active global configuration. - #[cfg_attr(not(feature = "processing"), expect(unused))] + #[expect(unused, reason = "not yet used")] pub global_config: &'a GlobalConfig, /// Project configuration associated with the unit of work. pub project_info: &'a ProjectInfo, diff --git a/relay-server/src/processing/transactions/types/output.rs b/relay-server/src/processing/transactions/types/output.rs index c579b4773b..965d40c6ae 100644 --- a/relay-server/src/processing/transactions/types/output.rs +++ b/relay-server/src/processing/transactions/types/output.rs @@ -6,10 +6,10 @@ use relay_quotas::DataCategory; use crate::Envelope; use crate::managed::{Managed, ManagedResult, Rejected}; +#[cfg(feature = "processing")] +use crate::processing::StoreHandle; use crate::processing::spans::Indexed; use crate::processing::transactions::types::{ExpandedTransaction, StandaloneProfile}; -#[cfg(feature = "processing")] -use crate::processing::{self, StoreHandle}; use crate::processing::{Forward, ForwardContext}; use crate::services::outcome::{DiscardReason, Outcome}; @@ -71,8 +71,7 @@ impl Forward for TransactionOutput { ctx: ForwardContext<'_>, ) -> Result<(), Rejected<()>> { let envelope = self.serialize_envelope(ctx)?; - let envelope = crate::managed::ManagedEnvelope::from(envelope).into_processed(); - processing::utils::store::forward_envelope(envelope, s, ctx.global_config); + s.send_envelope(crate::managed::ManagedEnvelope::from(envelope)); Ok(()) } } diff --git a/relay-server/src/processing/utils/store.rs b/relay-server/src/processing/utils/store.rs index 0421e2947f..3a73872ab2 100644 --- a/relay-server/src/processing/utils/store.rs +++ b/relay-server/src/processing/utils/store.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use chrono::Utc; use relay_conventions::CLIENT_SAMPLE_RATE; -use relay_dynamic_config::GlobalConfig; use relay_event_schema::protocol::Attributes; use relay_protocol::{Annotated, IntoValue, MetaTree, Value}; @@ -12,10 +11,7 @@ use sentry_protos::snuba::v1::{AnyValue, ArrayValue, CategoryCount, Outcomes, an use serde::Serialize; use uuid::Uuid; -use crate::envelope::ItemType; -use crate::managed::{Quantities, TypedEnvelope}; -use crate::processing; -use crate::services::processor::Processed; +use crate::managed::Quantities; /// Represents metadata extracted from Relay's annotated model. /// @@ -244,29 +240,3 @@ pub fn quantities_to_trace_item_outcomes(q: Quantities, scoping: Scoping) -> Out key_id: scoping.key_id.unwrap_or(0), } } - -/// Helper which forwards envelopes to be stored. -/// -/// This manages the dispatch logic between object store and Kafka. This is a temporary measure -/// until objectstore is available and used in all environments, including Self Hosted. -pub fn forward_envelope( - envelope: TypedEnvelope, - s: processing::StoreHandle<'_>, - global_config: &GlobalConfig, -) { - let has_attachments = envelope - .envelope() - .items() - .any(|item| item.ty() == &ItemType::Attachment); - - let use_objectstore = || { - crate::utils::sample(global_config.options.objectstore_attachments_sample_rate).is_keep() - }; - - let message = crate::services::store::StoreEnvelope { envelope }; - if has_attachments && use_objectstore() { - s.send_to_objectstore(message); - } else { - s.send_to_store(message); - } -} diff --git a/relay-server/src/services/objectstore.rs b/relay-server/src/services/objectstore.rs index b2afb6f7d9..5ecdec91bd 100644 --- a/relay-server/src/services/objectstore.rs +++ b/relay-server/src/services/objectstore.rs @@ -17,11 +17,10 @@ use uuid::Uuid; use crate::constants::DEFAULT_ATTACHMENT_RETENTION; use crate::envelope::ItemType; use crate::managed::{ - Counted, Managed, ManagedResult, OutcomeError, Quantities, Rejected, TypedEnvelope, + Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, }; use crate::processing::utils::store::item_id_to_uuid; use crate::services::outcome::DiscardReason; -use crate::services::processor::Processed; use crate::services::store::{Store, StoreEnvelope, StoreTraceItem}; use crate::services::upload; use crate::statsd::{RelayCounters, RelayTimers}; @@ -251,7 +250,7 @@ impl ObjectstoreServiceInner { /// /// This mutates the attachment items in-place, setting their `stored_key` field to the key /// in objectstore. - async fn handle_envelope(&self, mut envelope: TypedEnvelope) -> () { + async fn handle_envelope(&self, mut envelope: ManagedEnvelope) -> () { let scoping = envelope.scoping(); let session = self .event_attachments diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 6e41d644f9..183493bf7e 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2039,18 +2039,16 @@ impl EnvelopeProcessorService { use crate::processing::StoreHandle; let objectstore = self.inner.addrs.objectstore.as_ref(); - let handle = StoreHandle::new(store_forwarder, objectstore); + let global_config = &self.inner.global_config.current(); + let handle = StoreHandle::new(store_forwarder, objectstore, global_config); match submit { - Submit::Envelope(envelope) => { - let global_config = &self.inner.global_config.current(); - // Once check-ins and errors are fully moved to the new pipeline, this is only - // used for metrics forwarding. - // - // Metrics forwarding will n_never_ forward an envelope in processing, making - // this branch here unused. - processing::utils::store::forward_envelope(envelope, handle, global_config); - } + // Once check-ins and errors are fully moved to the new pipeline, this is only + // used for metrics forwarding. + // + // Metrics forwarding will n_never_ forward an envelope in processing, making + // this branch here unused. + Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()), Submit::Output { output, ctx } => output .forward_store(handle, ctx) .unwrap_or_else(|err| err.into_inner()), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 3075ce67b0..f40b4ea49f 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -34,12 +34,11 @@ use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; use relay_threading::AsyncPool; use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; -use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope}; +use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities}; use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::processor::Processed; use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; use crate::utils::{self, FormDataIter}; @@ -94,7 +93,7 @@ impl Producer { /// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics. #[derive(Debug)] pub struct StoreEnvelope { - pub envelope: TypedEnvelope, + pub envelope: ManagedEnvelope, } /// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics. From 220a0d2140dab900ab8291f1a388194dc689a2a0 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 12 Mar 2026 12:06:41 +0100 Subject: [PATCH 3/4] asd --- relay-server/src/processing/forward.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/relay-server/src/processing/forward.rs b/relay-server/src/processing/forward.rs index 07bb717656..3ae249535b 100644 --- a/relay-server/src/processing/forward.rs +++ b/relay-server/src/processing/forward.rs @@ -114,7 +114,6 @@ pub struct ForwardContext<'a> { /// The Relay configuration. pub config: &'a Config, /// A view of the currently active global configuration. - #[expect(unused, reason = "not yet used")] pub global_config: &'a GlobalConfig, /// Project configuration associated with the unit of work. pub project_info: &'a ProjectInfo, From 980d6ffe80e5dfb45956a891dcd57c6e5b9ea9c7 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 12 Mar 2026 14:37:29 +0100 Subject: [PATCH 4/4] more cfg stuff --- relay-server/src/processing/forward.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relay-server/src/processing/forward.rs b/relay-server/src/processing/forward.rs index 3ae249535b..a4ecb1832c 100644 --- a/relay-server/src/processing/forward.rs +++ b/relay-server/src/processing/forward.rs @@ -114,6 +114,7 @@ pub struct ForwardContext<'a> { /// The Relay configuration. pub config: &'a Config, /// A view of the currently active global configuration. + #[cfg_attr(not(feature = "processing"), expect(unused))] pub global_config: &'a GlobalConfig, /// Project configuration associated with the unit of work. pub project_info: &'a ProjectInfo,