diff --git a/relay-server/src/processing/attachments/mod.rs b/relay-server/src/processing/attachments/mod.rs index d6d20a81b0..c62de87811 100644 --- a/relay-server/src/processing/attachments/mod.rs +++ b/relay-server/src/processing/attachments/mod.rs @@ -60,14 +60,11 @@ impl AttachmentProcessor { } impl processing::Processor for AttachmentProcessor { - type UnitOfWork = SerializedAttachments; + type Input = SerializedAttachments; type Output = AttachmentsOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { debug_assert!( !envelope.envelope().items().any(Item::creates_event), "AttachmentProcessor should not receive items that create events" @@ -91,7 +88,7 @@ impl processing::Processor for AttachmentProcessor { async fn process( &self, - attachments: Managed, + attachments: Managed, ctx: processing::Context<'_>, ) -> Result, Rejected> { for item in &attachments.attachments { diff --git a/relay-server/src/processing/check_ins/mod.rs b/relay-server/src/processing/check_ins/mod.rs index 9940e1a10d..e7a5057bbd 100644 --- a/relay-server/src/processing/check_ins/mod.rs +++ b/relay-server/src/processing/check_ins/mod.rs @@ -59,14 +59,11 @@ impl CheckInsProcessor { } impl processing::Processor for CheckInsProcessor { - type UnitOfWork = SerializedCheckIns; + type Input = SerializedCheckIns; type Output = CheckInsOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let check_ins = envelope @@ -80,7 +77,7 @@ impl processing::Processor for CheckInsProcessor { async fn process( &self, - mut check_ins: Managed, + mut check_ins: Managed, ctx: Context<'_>, ) -> Result, Rejected> { if ctx.is_processing() { diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 250bb3748a..e84162bb57 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -64,14 +64,11 @@ impl ClientReportsProcessor { } impl processing::Processor for ClientReportsProcessor { - type UnitOfWork = SerializedClientReports; + type Input = SerializedClientReports; type Output = Nothing; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let reports = envelope .envelope_mut() @@ -87,7 +84,7 @@ impl processing::Processor for ClientReportsProcessor { async fn process( &self, - reports: Managed, + reports: Managed, ctx: Context<'_>, ) -> Result, Rejected> { let mut outcomes = process::expand(reports); diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs index 9763c0157e..c9abfa7a7f 100644 --- a/relay-server/src/processing/errors/mod.rs +++ b/relay-server/src/processing/errors/mod.rs @@ -72,14 +72,11 @@ impl ErrorsProcessor { } impl processing::Processor for ErrorsProcessor { - type UnitOfWork = SerializedError; + type Input = SerializedError; type Output = ErrorOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let has_transaction = envelope .envelope() .items() @@ -107,7 +104,7 @@ impl processing::Processor for ErrorsProcessor { async fn process( &self, - error: Managed, + error: Managed, ctx: Context<'_>, ) -> Result, Rejected> { let mut error = process::expand(error, ctx)?; diff --git a/relay-server/src/processing/logs/mod.rs b/relay-server/src/processing/logs/mod.rs index dadc60d7ac..b8124f9e17 100644 --- a/relay-server/src/processing/logs/mod.rs +++ b/relay-server/src/processing/logs/mod.rs @@ -100,14 +100,11 @@ impl LogsProcessor { } impl processing::Processor for LogsProcessor { - type UnitOfWork = SerializedLogs; + type Input = SerializedLogs; type Output = LogOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let logs = envelope @@ -135,7 +132,7 @@ impl processing::Processor for LogsProcessor { async fn process( &self, - logs: Managed, + logs: Managed, ctx: Context<'_>, ) -> Result, Rejected> { validate::container(&logs).reject(&logs)?; diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index 44d2356a7d..295f38eeee 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -46,25 +46,24 @@ pub mod utils; /// defines all items in an event based envelope to relate to the envelope. pub trait Processor { /// A unit of work, the processor can process. - type UnitOfWork: Counted; - /// The result after processing a [`Self::UnitOfWork`]. + type Input: Counted; + /// The result after processing a [`Self::Input`]. type Output: Forward; /// The error returned by [`Self::process`]. type Error: std::error::Error + 'static; - /// Extracts a [`Self::UnitOfWork`] from a [`ManagedEnvelope`]. + /// Extracts a [`Self::Input`] from a [`ManagedEnvelope`]. /// /// This is infallible, if a processor wants to report an error, - /// it should return a [`Self::UnitOfWork`] which later, can produce an error when being processed. + /// it should return a [`Self::Input`] which later, can produce an error when being processed. /// /// Returns `None` if nothing in the envelope concerns this processor. - fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) - -> Option>; + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option>; - /// Processes a [`Self::UnitOfWork`]. + /// Processes a [`Self::Input`]. async fn process( &self, - work: Managed, + work: Managed, ctx: Context<'_>, ) -> Result, Rejected>; } diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index 61ef0f4667..aa9ce5aa20 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -72,14 +72,11 @@ impl ProfileChunksProcessor { } impl processing::Processor for ProfileChunksProcessor { - type UnitOfWork = SerializedProfileChunks; + type Input = SerializedProfileChunks; type Output = ProfileChunkOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let profile_chunks = envelope .envelope_mut() .take_items_by(|item| matches!(*item.ty(), ItemType::ProfileChunk)) @@ -100,7 +97,7 @@ impl processing::Processor for ProfileChunksProcessor { async fn process( &self, - mut profile_chunks: Managed, + mut profile_chunks: Managed, ctx: Context<'_>, ) -> Result, Rejected> { filter::feature_flag(ctx).reject(&profile_chunks)?; diff --git a/relay-server/src/processing/replays/mod.rs b/relay-server/src/processing/replays/mod.rs index bf1e8b23f4..d017b1d647 100644 --- a/relay-server/src/processing/replays/mod.rs +++ b/relay-server/src/processing/replays/mod.rs @@ -156,14 +156,11 @@ impl ReplaysProcessor { } impl processing::Processor for ReplaysProcessor { - type UnitOfWork = SerializedReplays; + type Input = SerializedReplays; type Output = ReplaysOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let events = envelope .envelope_mut() @@ -194,7 +191,7 @@ impl processing::Processor for ReplaysProcessor { async fn process( &self, - replays: Managed, + replays: Managed, ctx: Context<'_>, ) -> Result, Rejected> { let replays = filter::feature_flag(replays, ctx)?; diff --git a/relay-server/src/processing/sessions/mod.rs b/relay-server/src/processing/sessions/mod.rs index 9e3614b657..b40fe0384e 100644 --- a/relay-server/src/processing/sessions/mod.rs +++ b/relay-server/src/processing/sessions/mod.rs @@ -56,14 +56,11 @@ impl SessionsProcessor { } impl processing::Processor for SessionsProcessor { - type UnitOfWork = SerializedSessions; + type Input = SerializedSessions; type Output = SessionsOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let updates = envelope @@ -86,7 +83,7 @@ impl processing::Processor for SessionsProcessor { async fn process( &self, - sessions: Managed, + sessions: Managed, ctx: Context<'_>, ) -> Result, Rejected> { let mut sessions = match process::expand(sessions, ctx) { diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index aab89b904a..b27211c9c9 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -113,14 +113,11 @@ impl SpansProcessor { } impl processing::Processor for SpansProcessor { - type UnitOfWork = SerializedSpans; + type Input = SerializedSpans; type Output = SpanOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let spans = envelope @@ -155,7 +152,7 @@ impl processing::Processor for SpansProcessor { async fn process( &self, - spans: Managed, + spans: Managed, ctx: Context<'_>, ) -> Result, Rejected> { let spans = filter::feature_flag_attachment(spans, ctx); diff --git a/relay-server/src/processing/trace_attachments/mod.rs b/relay-server/src/processing/trace_attachments/mod.rs index 631480a027..11bdf734ee 100644 --- a/relay-server/src/processing/trace_attachments/mod.rs +++ b/relay-server/src/processing/trace_attachments/mod.rs @@ -92,16 +92,13 @@ impl TraceAttachmentsProcessor { } impl Processor for TraceAttachmentsProcessor { - type UnitOfWork = SerializedAttachments; + type Input = SerializedAttachments; type Output = Managed; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let items = envelope .envelope_mut() @@ -115,7 +112,7 @@ impl Processor for TraceAttachmentsProcessor { async fn process( &self, - work: Managed, + work: Managed, ctx: Context<'_>, ) -> Result, Rejected> { let work = filter::feature_flag(work, ctx)?; diff --git a/relay-server/src/processing/trace_metrics/mod.rs b/relay-server/src/processing/trace_metrics/mod.rs index 1a1fbbeb6a..d54cdf553d 100644 --- a/relay-server/src/processing/trace_metrics/mod.rs +++ b/relay-server/src/processing/trace_metrics/mod.rs @@ -102,14 +102,11 @@ impl TraceMetricsProcessor { } impl processing::Processor for TraceMetricsProcessor { - type UnitOfWork = SerializedTraceMetrics; + type Input = SerializedTraceMetrics; type Output = TraceMetricOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let metrics = envelope @@ -127,7 +124,7 @@ impl processing::Processor for TraceMetricsProcessor { async fn process( &self, - metrics: Managed, + metrics: Managed, ctx: Context<'_>, ) -> Result, Rejected> { validate::container(&metrics)?; diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 57a386a9ea..a460e6eb80 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -89,14 +89,11 @@ impl TransactionProcessor { } impl Processor for TransactionProcessor { - type UnitOfWork = SerializedTransaction; + type Input = SerializedTransaction; type Output = TransactionOutput; type Error = Error; - fn prepare_envelope( - &self, - envelope: &mut ManagedEnvelope, - ) -> Option> { + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let mut event = envelope @@ -128,32 +125,32 @@ impl Processor for TransactionProcessor { async fn process( &self, - work: Managed, + tx: Managed, mut ctx: Context<'_>, ) -> Result, Rejected> { - let project_id = work.scoping().project_id; + let project_id = tx.scoping().project_id; let mut metrics = Metrics::default(); relay_log::trace!("Expand transaction"); - let mut work = process::expand(work)?; + let mut tx = process::expand(tx)?; relay_log::trace!("Prepare transaction data"); - process::prepare_data(&mut work, &mut ctx, &mut metrics)?; + process::prepare_data(&mut tx, &mut ctx, &mut metrics)?; relay_log::trace!("Normalize transaction"); - let mut work = process::normalize(work, ctx, &self.geoip_lookup)?; + let mut tx = process::normalize(tx, ctx, &self.geoip_lookup)?; relay_log::trace!("Filter transaction"); - let filters_status = process::run_inbound_filters(&work, ctx)?; + let filters_status = process::run_inbound_filters(&tx, ctx)?; let quotas_client = self.quotas_client.as_ref(); relay_log::trace!("Processing profile"); - process::process_profile(&mut work, ctx); + process::process_profile(&mut tx, ctx); relay_log::trace!("Sample transaction"); - let (work, server_sample_rate) = - match process::run_dynamic_sampling(work, ctx, filters_status, quotas_client).await? { + let (tx, server_sample_rate) = + match process::run_dynamic_sampling(tx, ctx, filters_status, quotas_client).await? { SamplingOutput::Keep { payload, sample_rate, @@ -176,27 +173,27 @@ impl Processor for TransactionProcessor { // Need to scrub the transaction before extracting spans. relay_log::trace!("Scrubbing transaction"); #[allow(unused_mut)] - let mut work = process::scrub(work, ctx)?; + let mut tx = process::scrub(tx, ctx)?; - work = process::extract_spans(work, ctx, server_sample_rate); + tx = process::extract_spans(tx, ctx, server_sample_rate); relay_log::trace!("Enforce quotas"); - let work = self.limiter.enforce_quotas(work, ctx).await?; - let work = match work.transpose() { - either::Either::Left(work) => work, + let tx = self.limiter.enforce_quotas(tx, ctx).await?; + let tx = match tx.transpose() { + either::Either::Left(tx) => tx, either::Either::Right(metrics) => return Ok(Output::metrics(metrics)), }; if ctx.is_processing() { - if !work.flags.fully_normalized { + if !tx.flags.fully_normalized { relay_log::error!( tags.project = %project_id, - tags.ty = event_type(&work.event).map(|e| e.to_string()).unwrap_or("none".to_owned()), + tags.ty = event_type(&tx.event).map(|e| e.to_string()).unwrap_or("none".to_owned()), "ingested event without normalizing" ); }; - let (indexed, metrics) = split_indexed_and_total(work, ctx, SamplingDecision::Keep)?; + let (indexed, metrics) = split_indexed_and_total(tx, ctx, SamplingDecision::Keep)?; return Ok(Output { main: Some(TransactionOutput::Indexed(indexed)), @@ -205,7 +202,7 @@ impl Processor for TransactionProcessor { } Ok(Output { - main: Some(TransactionOutput::Full(work)), + main: Some(TransactionOutput::Full(tx)), metrics: None, }) }