diff --git a/relay-server/src/managed/envelope.rs b/relay-server/src/managed/envelope.rs index c3b2efba1a..93e1028a58 100644 --- a/relay-server/src/managed/envelope.rs +++ b/relay-server/src/managed/envelope.rs @@ -99,6 +99,11 @@ impl TypedEnvelope { self.0.accept() } + /// Returns the raw [`ManagedEnvelope`]. + pub fn into_inner(self) -> ManagedEnvelope { + self.0 + } + /// Creates a new typed envelope. /// /// Note: this method is private to make sure that only `TryFrom` implementation is used, which diff --git a/relay-server/src/processing/check_ins/mod.rs b/relay-server/src/processing/check_ins/mod.rs index 9940e1a10d..7230833ed0 100644 --- a/relay-server/src/processing/check_ins/mod.rs +++ b/relay-server/src/processing/check_ins/mod.rs @@ -116,7 +116,7 @@ impl Forward for CheckInsOutput { ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { let envelope = self.serialize_envelope(ctx)?; - let envelope = ManagedEnvelope::from(envelope).into_processed(); + let envelope = ManagedEnvelope::from(envelope); s.send_to_store(crate::services::store::StoreEnvelope { envelope }); diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs index 9763c0157e..4fa717f98d 100644 --- a/relay-server/src/processing/errors/mod.rs +++ b/relay-server/src/processing/errors/mod.rs @@ -344,23 +344,7 @@ impl Forward for ErrorOutput { ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { let envelope = self.serialize_envelope(ctx)?; - let envelope = ManagedEnvelope::from(envelope).into_processed(); - - let has_attachments = envelope - .envelope() - .items() - .any(|item| item.ty() == &ItemType::Attachment); - let use_objectstore = || { - let options = &ctx.global_config.options; - crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep() - }; - - if has_attachments && use_objectstore() { - s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope }); - } else { - s.send_to_store(crate::services::store::StoreEnvelope { envelope }); - } - + s.send_envelope(ManagedEnvelope::from(envelope)); Ok(()) } } diff --git a/relay-server/src/processing/forward.rs b/relay-server/src/processing/forward.rs index d5cb4d2fcb..a4ecb1832c 100644 --- a/relay-server/src/processing/forward.rs +++ b/relay-server/src/processing/forward.rs @@ -5,6 +5,8 @@ use relay_dynamic_config::{RetentionConfig, RetentionsConfig}; use relay_system::{Addr, FromMessage}; use crate::Envelope; +#[cfg(feature = "processing")] +use crate::managed::ManagedEnvelope; use crate::managed::{Managed, Rejected}; #[cfg(feature = "processing")] use crate::services::objectstore::Objectstore; @@ -18,12 +20,21 @@ use crate::services::store::Store; pub struct StoreHandle<'a> { store: &'a Addr, objectstore: Option<&'a Addr>, + global_config: &'a GlobalConfig, } #[cfg(feature = "processing")] impl<'a> StoreHandle<'a> { - pub fn new(store: &'a Addr, objectstore: Option<&'a Addr>) -> Self { - Self { store, objectstore } + pub fn new( + store: &'a Addr, + objectstore: Option<&'a Addr>, + global_config: &'a GlobalConfig, + ) -> Self { + Self { + store, + objectstore, + global_config, + } } /// Sends a message to the [`Store`] service. @@ -45,6 +56,36 @@ impl<'a> StoreHandle<'a> { relay_log::error!("Objectstore service not configured. Dropping message."); } } + + /// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service. + pub fn send_envelope(&self, envelope: ManagedEnvelope) { + use crate::services::store::StoreEnvelope; + + let Some(objectstore) = self.objectstore else { + self.store.send(StoreEnvelope { envelope }); + return; + }; + + let has_attachments = envelope + .envelope() + .items() + .any(|item| item.ty() == &crate::envelope::ItemType::Attachment); + + let use_objectstore = || { + crate::utils::sample( + self.global_config + .options + .objectstore_attachments_sample_rate, + ) + .is_keep() + }; + + if has_attachments && use_objectstore() { + objectstore.send(StoreEnvelope { envelope }) + } else { + self.store.send(StoreEnvelope { envelope }); + } + } } /// A processor output which can be forwarded to a different destination. diff --git a/relay-server/src/processing/transactions/types/output.rs b/relay-server/src/processing/transactions/types/output.rs index a16b53f458..965d40c6ae 100644 --- a/relay-server/src/processing/transactions/types/output.rs +++ b/relay-server/src/processing/transactions/types/output.rs @@ -5,8 +5,6 @@ use relay_protocol::Annotated; use relay_quotas::DataCategory; use crate::Envelope; -#[cfg(feature = "processing")] -use crate::managed::ManagedEnvelope; use crate::managed::{Managed, ManagedResult, Rejected}; #[cfg(feature = "processing")] use crate::processing::StoreHandle; @@ -72,26 +70,8 @@ impl Forward for TransactionOutput { s: StoreHandle<'_>, ctx: ForwardContext<'_>, ) -> Result<(), Rejected<()>> { - use crate::envelope::ItemType; - let envelope = self.serialize_envelope(ctx)?; - let envelope = ManagedEnvelope::from(envelope).into_processed(); - - let has_attachments = envelope - .envelope() - .items() - .any(|item| item.ty() == &ItemType::Attachment); - let use_objectstore = || { - let options = &ctx.global_config.options; - crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep() - }; - - if has_attachments && use_objectstore() { - s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope }); - } else { - s.send_to_store(crate::services::store::StoreEnvelope { envelope }); - } - + s.send_envelope(crate::managed::ManagedEnvelope::from(envelope)); Ok(()) } } diff --git a/relay-server/src/services/objectstore.rs b/relay-server/src/services/objectstore.rs index 6e16e1eaf5..527bc6d9c9 100644 --- a/relay-server/src/services/objectstore.rs +++ b/relay-server/src/services/objectstore.rs @@ -17,11 +17,10 @@ use uuid::Uuid; use crate::constants::DEFAULT_ATTACHMENT_RETENTION; use crate::envelope::ItemType; use crate::managed::{ - Counted, Managed, ManagedResult, OutcomeError, Quantities, Rejected, TypedEnvelope, + Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, }; use crate::processing::utils::store::item_id_to_uuid; use crate::services::outcome::DiscardReason; -use crate::services::processor::Processed; use crate::services::store::{Store, StoreAttachment, StoreEnvelope, StoreTraceItem}; use crate::services::upload; use crate::statsd::{RelayCounters, RelayTimers}; @@ -271,7 +270,7 @@ impl ObjectstoreServiceInner { /// /// This mutates the attachment items in-place, setting their `stored_key` field to the key /// in objectstore. - async fn handle_envelope(&self, mut envelope: TypedEnvelope) -> () { + async fn handle_envelope(&self, mut envelope: ManagedEnvelope) -> () { let scoping = envelope.scoping(); let session = self .event_attachments diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index dd33662538..77661aedf9 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -79,7 +79,7 @@ use relay_threading::AsyncPool; #[cfg(feature = "processing")] use { crate::services::objectstore::Objectstore, - crate::services::store::{Store, StoreEnvelope}, + crate::services::store::Store, crate::utils::Enforcement, itertools::Itertools, relay_cardinality::{ @@ -2064,30 +2064,18 @@ impl EnvelopeProcessorService { use crate::processing::StoreHandle; let objectstore = self.inner.addrs.objectstore.as_ref(); + let global_config = &self.inner.global_config.current(); + let handle = StoreHandle::new(store_forwarder, objectstore, global_config); + match submit { - Submit::Envelope(envelope) => { - let envelope_has_attachments = envelope - .envelope() - .items() - .any(|item| *item.ty() == ItemType::Attachment); - // Whether Relay will store this attachment in objectstore or use kafka like before. - let use_objectstore = || { - let options = &self.inner.global_config.current().options; - utils::sample(options.objectstore_attachments_sample_rate).is_keep() - }; - - if let Some(objectstore) = &self.inner.addrs.objectstore - && envelope_has_attachments - && use_objectstore() - { - // the `ObjectstoreService` will upload all attachments, and then forward the envelope to the `StoreService`. - objectstore.send(StoreEnvelope { envelope }) - } else { - store_forwarder.send(StoreEnvelope { envelope }) - } - } + // Once check-ins and errors are fully moved to the new pipeline, this is only + // used for metrics forwarding. + // + // Metrics forwarding will n_never_ forward an envelope in processing, making + // this branch here unused. + Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()), Submit::Output { output, ctx } => output - .forward_store(StoreHandle::new(store_forwarder, objectstore), ctx) + .forward_store(handle, ctx) .unwrap_or_else(|err| err.into_inner()), } return; diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 0e94ad5ab9..eef99030fc 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -34,12 +34,11 @@ use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; use relay_threading::AsyncPool; use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; -use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope}; +use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities}; use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::processor::Processed; use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; use crate::utils::{self, FormDataIter}; @@ -94,7 +93,7 @@ impl Producer { /// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics. #[derive(Debug)] pub struct StoreEnvelope { - pub envelope: TypedEnvelope, + pub envelope: ManagedEnvelope, } /// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.