From 91ba01ac25d162852e4a91bb94b7bef9f3c5f016 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Tue, 4 Nov 2025 14:46:28 +0100 Subject: [PATCH 01/15] Initial changes --- .../src/processing/client_reports/mod.rs | 141 ++++++ .../src/processing/client_reports/process.rs | 217 +++++++++ relay-server/src/processing/common.rs | 2 + relay-server/src/processing/mod.rs | 1 + relay-server/src/services/processor.rs | 40 +- relay-server/src/services/processor/report.rs | 415 +++++------------- 6 files changed, 478 insertions(+), 338 deletions(-) create mode 100644 relay-server/src/processing/client_reports/mod.rs create mode 100644 relay-server/src/processing/client_reports/process.rs diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs new file mode 100644 index 00000000000..5f3d5a204aa --- /dev/null +++ b/relay-server/src/processing/client_reports/mod.rs @@ -0,0 +1,141 @@ +use std::sync::Arc; + +use relay_quotas::RateLimits; + +use crate::envelope::{EnvelopeHeaders, Item, ItemType}; +use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; +use crate::processing::{ + self, Context, CountRateLimited, Forward, ForwardContext, Output, QuotaRateLimiter, +}; +use crate::services::outcome::Outcome; + +mod process; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// The client-reports are rate limited. + #[error("rate limited")] + RateLimited(RateLimits), +} + +impl OutcomeError for Error { + type Error = Self; + + fn consume(self) -> (Option, Self::Error) { + let outcome = match &self { + Self::RateLimited(limits) => { + let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); + Some(Outcome::RateLimited(reason_code)) + } + }; + (outcome, self) + } +} + +impl From for Error { + fn from(value: RateLimits) -> Self { + Self::RateLimited(value) + } +} + +/// A processor for Client-Reports. +pub struct ClientReportsProcessor { + limiter: Arc, +} + +impl ClientReportsProcessor { + /// Creates a new [`Self`]. + pub fn new(limiter: Arc) -> Self { + Self { limiter } + } +} + +impl processing::Processor for ClientReportsProcessor { + type UnitOfWork = SerializedClientReport; + type Output = ClientReportOutput; + type Error = Error; + + fn prepare_envelope( + &self, + envelope: &mut ManagedEnvelope, + ) -> Option> { + let headers = envelope.envelope().headers().clone(); + + let client_reports = envelope + .envelope_mut() + .take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport)) + .into_vec(); + + let work = SerializedClientReport { + headers, + client_reports, + }; + Some(Managed::from_envelope(envelope, work)) + } + + async fn process( + &self, + mut client_reports: Managed, + ctx: Context<'_>, + ) -> Result, Rejected> { + // FIXME: Decide if we want to make the TrackedOutcomes the output of this processor. + let outcomes = + process::process_client_reports(&mut client_reports, ctx.config, ctx.project_info); + + // FIXME: Are there even quotas on a client_report (the old code did check quotas but seems strange) + self.limiter + .enforce_quotas(&mut client_reports, ctx) + .await?; + + // FIXME: Looking at the 'old' processing code seems like we might still need to emit some + // metrics here + Ok(Output::just(ClientReportOutput(client_reports))) + } +} + +// FIXME: The correct output might actually be the TrackedOutcomes that we generate +/// Output produced by the [`ClientReportsProcessor`]. +#[derive(Debug)] +pub struct ClientReportOutput(Managed); + +impl Forward for ClientReportOutput { + fn serialize_envelope( + self, + ctx: ForwardContext<'_>, + ) -> Result>, Rejected<()>> { + // FIXME: Understand what should happen here + todo!() + } + + #[cfg(feature = "processing")] + fn forward_store( + self, + s: &relay_system::Addr, + ctx: ForwardContext<'_>, + ) -> Result<(), Rejected<()>> { + // FIXME: Understand what should happen here + todo!() + } +} + +/// Client-Reports in their serialized state, as transported in an envelope. +#[derive(Debug)] +pub struct SerializedClientReport { + /// Original envelope headers. + headers: EnvelopeHeaders, + + /// A list of client-reports waiting to be processed. + /// + /// All items contained here must be client-reports. + client_reports: Vec, +} + +impl Counted for SerializedClientReport { + fn quantities(&self) -> Quantities { + smallvec::smallvec![] + } +} + +impl CountRateLimited for Managed { + type Error = Error; +} diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs new file mode 100644 index 00000000000..76e791cab50 --- /dev/null +++ b/relay-server/src/processing/client_reports/process.rs @@ -0,0 +1,217 @@ +use std::collections::BTreeMap; +use std::error::Error; + +use chrono::{Duration as SignedDuration, Utc}; +use relay_common::time::UnixTimestamp; +use relay_config::Config; +use relay_event_normalization::ClockDriftProcessor; +use relay_event_schema::protocol::ClientReport; +use relay_filter::FilterStatKey; +use relay_quotas::ReasonCode; +use relay_sampling::evaluation::MatchedRuleIds; + +use crate::constants::DEFAULT_EVENT_RETENTION; +use crate::managed::Managed; +use crate::processing::client_reports::SerializedClientReport; +use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome}; +use crate::services::processor::MINIMUM_CLOCK_DRIFT; +use crate::services::projects::project::ProjectInfo; + +use crate::processing::client_reports; + +/// Fields of client reports that map to specific [`Outcome`]s without content. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ClientReportField { + /// The event has been filtered by an inbound data filter. + Filtered, + + /// The event has been filtered by a sampling rule. + FilteredSampling, + + /// The event has been rate limited. + RateLimited, + + /// The event has already been discarded on the client side. + ClientDiscard, +} + +/// Validates and extracts client reports. +/// +/// At the moment client reports are primarily used to transfer outcomes from +/// client SDKs. The outcomes are removed here and sent directly to the outcomes +/// system. +pub fn process_client_reports( + client_reports: &mut Managed, + config: &Config, + project_info: &ProjectInfo, +) -> Vec { + // if client outcomes are disabled we leave the client reports unprocessed + // and pass them on. + if !config.emit_outcomes().any() || !config.emit_client_outcomes() { + // if a processing relay has client outcomes disabled we drop them. + if config.processing_enabled() { + // FIXME: Understand how to best drop them here silently + todo!("Drop all the items silently"); + } + return vec![]; + } + + let mut timestamp = None; + let mut output_events = BTreeMap::new(); + let received = client_reports.received_at(); + + let clock_drift_processor = + ClockDriftProcessor::new(client_reports.headers.sent_at(), received) + .at_least(MINIMUM_CLOCK_DRIFT); + + // we're going through all client reports but we're effectively just merging + // them into the first one. + client_reports.retain( + |client_reports| &mut client_reports.client_reports, + |item, _| { + match ClientReport::parse(&item.payload()) { + Ok(ClientReport { + timestamp: report_timestamp, + discarded_events, + rate_limited_events, + filtered_events, + filtered_sampling_events, + }) => { + // Glue all discarded events together and give them the appropriate outcome type + let input_events = discarded_events + .into_iter() + .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) + .chain( + filtered_events.into_iter().map(|discarded_event| { + (ClientReportField::Filtered, discarded_event) + }), + ) + .chain(filtered_sampling_events.into_iter().map(|discarded_event| { + (ClientReportField::FilteredSampling, discarded_event) + })) + .chain(rate_limited_events.into_iter().map(|discarded_event| { + (ClientReportField::RateLimited, discarded_event) + })); + + for (outcome_type, discarded_event) in input_events { + if discarded_event.reason.len() > 200 { + relay_log::trace!("ignored client outcome with an overlong reason"); + continue; + } + *output_events + .entry(( + outcome_type, + discarded_event.reason, + discarded_event.category, + )) + .or_insert(0) += discarded_event.quantity; + } + if let Some(ts) = report_timestamp { + timestamp.get_or_insert(ts); + } + } + Err(err) => { + relay_log::trace!(error = &err as &dyn Error, "invalid client report received") + } + } + // FIXME: Understand what the equivalent of just dropping them here silently would be. + Ok::<_, client_reports::Error>(()) + }, + ); + + if output_events.is_empty() { + return vec![]; + } + + let timestamp = + timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); + + if clock_drift_processor.is_drifted() { + relay_log::trace!("applying clock drift correction to client report"); + clock_drift_processor.process_timestamp(timestamp); + } + + let retention_days = project_info + .config() + .event_retention + .unwrap_or(DEFAULT_EVENT_RETENTION); + let max_age = SignedDuration::days(retention_days.into()); + // also if we unable to parse the timestamp, we assume it's way too old here. + let in_past = timestamp + .as_datetime() + .map(|ts| (received - ts) > max_age) + .unwrap_or(true); + if in_past { + relay_log::trace!( + "skipping client outcomes older than {} days", + max_age.num_days() + ); + return vec![]; + } + + let max_future = SignedDuration::seconds(config.max_secs_in_future()); + // also if we unable to parse the timestamp, we assume it's way far in the future here. + let in_future = timestamp + .as_datetime() + .map(|ts| (ts - received) > max_future) + .unwrap_or(true); + if in_future { + relay_log::trace!( + "skipping client outcomes more than {}s in the future", + max_future.num_seconds() + ); + return vec![]; + } + + // FIXME: This can be done more elegantly + let mut outcome_collection: Vec = vec![]; + for ((outcome_type, reason, category), quantity) in output_events.into_iter() { + let outcome = match outcome_from_parts(outcome_type, &reason) { + Ok(outcome) => outcome, + Err(_) => { + relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); + continue; + } + }; + + outcome_collection.push(TrackOutcome { + // If we get to this point, the unwrap should not be used anymore, since we know by + // now that the timestamp can be parsed, but just incase we fallback to UTC current + // `DateTime`. + timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), + scoping: client_reports.scoping(), + outcome, + event_id: None, + remote_addr: None, // omitting the client address allows for better aggregation + category, + quantity, + }); + } + + outcome_collection +} + +/// Parse an outcome from an outcome ID and a reason string. +/// +/// Currently only used to reconstruct outcomes encoded in client reports. +fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result { + match field { + ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") { + Some(rule_ids) => MatchedRuleIds::parse(rule_ids) + .map(RuleCategories::from) + .map(Outcome::FilteredSampling) + .map_err(|_| ()), + None => Err(()), + }, + ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())), + ClientReportField::Filtered => Ok(Outcome::Filtered( + FilterStatKey::try_from(reason).map_err(|_| ())?, + )), + ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason { + "" => None, + other => Some(ReasonCode::new(other)), + })), + } +} + +// FIXME: Move the test over and adapt them. diff --git a/relay-server/src/processing/common.rs b/relay-server/src/processing/common.rs index 868346a39b0..1e62ee2751d 100644 --- a/relay-server/src/processing/common.rs +++ b/relay-server/src/processing/common.rs @@ -2,6 +2,7 @@ use crate::Envelope; use crate::managed::{Managed, Rejected}; use crate::processing::ForwardContext; use crate::processing::check_ins::CheckInsProcessor; +use crate::processing::client_reports::ClientReportsProcessor; use crate::processing::logs::LogsProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; @@ -57,4 +58,5 @@ outputs!( TraceMetrics => TraceMetricsProcessor, Spans => SpansProcessor, Sessions => SessionsProcessor, + ClientReports => ClientReportsProcessor, ); diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index ae05848688d..0db763179eb 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -23,6 +23,7 @@ pub use self::forward::*; pub use self::limits::*; pub mod check_ins; +pub mod client_reports; pub mod logs; pub mod sessions; pub mod spans; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 3656eb16a1f..41258cfcedf 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -48,6 +48,7 @@ use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket}; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor}; use crate::processing::check_ins::CheckInsProcessor; +use crate::processing::client_reports::ClientReportsProcessor; use crate::processing::logs::LogsProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; @@ -1145,6 +1146,7 @@ struct Processing { spans: SpansProcessor, check_ins: CheckInsProcessor, sessions: SessionsProcessor, + client_report: ClientReportsProcessor, } impl EnvelopeProcessorService { @@ -1231,7 +1233,8 @@ impl EnvelopeProcessorService { trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)), spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)), - sessions: SessionsProcessor::new(quota_limiter), + sessions: SessionsProcessor::new(Arc::clone("a_limiter)), + client_report: ClientReportsProcessor::new(quota_limiter), }, geoip_lookup, config, @@ -1823,32 +1826,6 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } - /// Processes user and client reports. - async fn process_client_reports( - &self, - managed_envelope: &mut TypedEnvelope, - ctx: processing::Context<'_>, - ) -> Result, ProcessingError> { - let mut extracted_metrics = ProcessingExtractedMetrics::new(); - - self.enforce_quotas( - managed_envelope, - Annotated::empty(), - &mut extracted_metrics, - ctx, - ) - .await?; - - report::process_client_reports( - managed_envelope, - ctx.config, - ctx.project_info, - self.inner.addrs.outcome_aggregator.clone(), - ); - - Ok(Some(extracted_metrics)) - } - /// Processes replays. async fn process_replays( &self, @@ -2046,7 +2023,14 @@ impl EnvelopeProcessorService { .await } ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx), - ProcessingGroup::ClientReport => run!(process_client_reports, ctx), + ProcessingGroup::ClientReport => { + self.process_with_processor( + &self.inner.processing.client_report, + managed_envelope, + ctx, + ) + .await + } ProcessingGroup::Replay => { run!(process_replays, ctx) } diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index ad5b7f6dc7d..0d368fc9ba0 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -1,195 +1,12 @@ //! Contains code related to validation and normalization of the user and client reports. -use std::collections::BTreeMap; use std::error::Error; -use chrono::{Duration as SignedDuration, Utc}; -use relay_common::time::UnixTimestamp; -use relay_config::Config; -use relay_event_normalization::ClockDriftProcessor; -use relay_event_schema::protocol::{ClientReport, UserReport}; -use relay_filter::FilterStatKey; -use relay_quotas::ReasonCode; -use relay_sampling::evaluation::MatchedRuleIds; -use relay_system::Addr; - -use crate::constants::DEFAULT_EVENT_RETENTION; +use relay_event_schema::protocol::UserReport; + use crate::envelope::{ContentType, ItemType}; use crate::managed::{ItemAction, TypedEnvelope}; -use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome}; -use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT}; -use crate::services::projects::project::ProjectInfo; - -/// Fields of client reports that map to specific [`Outcome`]s without content. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum ClientReportField { - /// The event has been filtered by an inbound data filter. - Filtered, - - /// The event has been filtered by a sampling rule. - FilteredSampling, - - /// The event has been rate limited. - RateLimited, - - /// The event has already been discarded on the client side. - ClientDiscard, -} - -/// Validates and extracts client reports. -/// -/// At the moment client reports are primarily used to transfer outcomes from -/// client SDKs. The outcomes are removed here and sent directly to the outcomes -/// system. -pub fn process_client_reports( - managed_envelope: &mut TypedEnvelope, - config: &Config, - project_info: &ProjectInfo, - outcome_aggregator: Addr, -) { - // if client outcomes are disabled we leave the client reports unprocessed - // and pass them on. - if !config.emit_outcomes().any() || !config.emit_client_outcomes() { - // if a processing relay has client outcomes disabled we drop them. - if config.processing_enabled() { - managed_envelope.retain_items(|item| match item.ty() { - ItemType::ClientReport => ItemAction::DropSilently, - _ => ItemAction::Keep, - }); - } - return; - } - - let mut timestamp = None; - let mut output_events = BTreeMap::new(); - let received = managed_envelope.received_at(); - - let clock_drift_processor = - ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received) - .at_least(MINIMUM_CLOCK_DRIFT); - - // we're going through all client reports but we're effectively just merging - // them into the first one. - managed_envelope.retain_items(|item| { - if item.ty() != &ItemType::ClientReport { - return ItemAction::Keep; - }; - match ClientReport::parse(&item.payload()) { - Ok(ClientReport { - timestamp: report_timestamp, - discarded_events, - rate_limited_events, - filtered_events, - filtered_sampling_events, - }) => { - // Glue all discarded events together and give them the appropriate outcome type - let input_events = - discarded_events - .into_iter() - .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) - .chain( - filtered_events.into_iter().map(|discarded_event| { - (ClientReportField::Filtered, discarded_event) - }), - ) - .chain(filtered_sampling_events.into_iter().map(|discarded_event| { - (ClientReportField::FilteredSampling, discarded_event) - })) - .chain(rate_limited_events.into_iter().map(|discarded_event| { - (ClientReportField::RateLimited, discarded_event) - })); - - for (outcome_type, discarded_event) in input_events { - if discarded_event.reason.len() > 200 { - relay_log::trace!("ignored client outcome with an overlong reason"); - continue; - } - *output_events - .entry(( - outcome_type, - discarded_event.reason, - discarded_event.category, - )) - .or_insert(0) += discarded_event.quantity; - } - if let Some(ts) = report_timestamp { - timestamp.get_or_insert(ts); - } - } - Err(err) => { - relay_log::trace!(error = &err as &dyn Error, "invalid client report received") - } - } - ItemAction::DropSilently - }); - - if output_events.is_empty() { - return; - } - - let timestamp = - timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); - - if clock_drift_processor.is_drifted() { - relay_log::trace!("applying clock drift correction to client report"); - clock_drift_processor.process_timestamp(timestamp); - } - - let retention_days = project_info - .config() - .event_retention - .unwrap_or(DEFAULT_EVENT_RETENTION); - let max_age = SignedDuration::days(retention_days.into()); - // also if we unable to parse the timestamp, we assume it's way too old here. - let in_past = timestamp - .as_datetime() - .map(|ts| (received - ts) > max_age) - .unwrap_or(true); - if in_past { - relay_log::trace!( - "skipping client outcomes older than {} days", - max_age.num_days() - ); - return; - } - - let max_future = SignedDuration::seconds(config.max_secs_in_future()); - // also if we unable to parse the timestamp, we assume it's way far in the future here. - let in_future = timestamp - .as_datetime() - .map(|ts| (ts - received) > max_future) - .unwrap_or(true); - if in_future { - relay_log::trace!( - "skipping client outcomes more than {}s in the future", - max_future.num_seconds() - ); - return; - } - - for ((outcome_type, reason, category), quantity) in output_events.into_iter() { - let outcome = match outcome_from_parts(outcome_type, &reason) { - Ok(outcome) => outcome, - Err(_) => { - relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); - continue; - } - }; - - outcome_aggregator.send(TrackOutcome { - // If we get to this point, the unwrap should not be used anymore, since we know by - // now that the timestamp can be parsed, but just incase we fallback to UTC current - // `DateTime`. - timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), - scoping: managed_envelope.scoping(), - outcome, - event_id: None, - remote_addr: None, // omitting the client address allows for better aggregation - category, - quantity, - }); - } -} +use crate::services::outcome::{DiscardReason, Outcome}; /// Validates and normalizes all user report items in the envelope. /// @@ -241,41 +58,18 @@ fn trim_whitespaces(data: &[u8]) -> &[u8] { &data[from..to + 1] } -/// Parse an outcome from an outcome ID and a reason string. -/// -/// Currently only used to reconstruct outcomes encoded in client reports. -fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result { - match field { - ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") { - Some(rule_ids) => MatchedRuleIds::parse(rule_ids) - .map(RuleCategories::from) - .map(Outcome::FilteredSampling) - .map_err(|_| ()), - None => Err(()), - }, - ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())), - ClientReportField::Filtered => Ok(Outcome::Filtered( - FilterStatKey::try_from(reason).map_err(|_| ())?, - )), - ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason { - "" => None, - other => Some(ReasonCode::new(other)), - })), - } -} - #[cfg(test)] mod tests { use relay_cogs::Token; use relay_config::Config; use relay_event_schema::protocol::EventId; use relay_sampling::evaluation::ReservoirCounters; + use relay_system::Addr; use crate::envelope::{Envelope, Item}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; use crate::processing; - use crate::services::outcome::RuleCategory; use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit}; use crate::testutils::create_test_processor; @@ -566,106 +360,107 @@ mod tests { assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event); } - #[test] - fn test_from_outcome_type_sampled() { - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err()); - - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err()); - - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err()); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"), - Err(()) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"), - Err(()) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"), - Err(()) - )); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::BoostEnvironments].into() - ))) - ); - - assert_eq!( - outcome_from_parts( - ClientReportField::FilteredSampling, - "Sampled:1001,1456,1567,3333,4444" - ), - Ok(Outcome::FilteredSampling(RuleCategories( - [ - RuleCategory::BoostEnvironments, - RuleCategory::BoostLowVolumeTransactions, - RuleCategory::BoostLatestReleases, - RuleCategory::Custom - ] - .into() - ))) - ); - } - - #[test] - fn test_from_outcome_type_filtered() { - assert!(matches!( - outcome_from_parts(ClientReportField::Filtered, "error-message"), - Ok(Outcome::Filtered(FilterStatKey::ErrorMessage)) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::Filtered, "hydration-error"), - Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_))) - )); - } - - #[test] - fn test_from_outcome_type_client_discard() { - assert_eq!( - outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(), - Outcome::ClientDiscard("foo_reason".into()) - ); - } - - #[test] - fn test_from_outcome_type_rate_limited() { - assert!(matches!( - outcome_from_parts(ClientReportField::RateLimited, ""), - Ok(Outcome::RateLimited(None)) - )); - assert_eq!( - outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(), - Outcome::RateLimited(Some(ReasonCode::new("foo_reason"))) - ); - } + // FIXME: Move these and all the other test over. + // #[test] + // fn test_from_outcome_type_sampled() { + // assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err()); + + // assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err()); + + // assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err()); + + // assert!(matches!( + // outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"), + // Err(()) + // )); + + // assert!(matches!( + // outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"), + // Err(()) + // )); + + // assert!(matches!( + // outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"), + // Err(()) + // )); + + // assert_eq!( + // outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"), + // Ok(Outcome::FilteredSampling(RuleCategories( + // [RuleCategory::Other].into() + // ))) + // ); + + // assert_eq!( + // outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), + // Ok(Outcome::FilteredSampling(RuleCategories( + // [RuleCategory::Other].into() + // ))) + // ); + + // assert_eq!( + // outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), + // Ok(Outcome::FilteredSampling(RuleCategories( + // [RuleCategory::Other].into() + // ))) + // ); + + // assert_eq!( + // outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"), + // Ok(Outcome::FilteredSampling(RuleCategories( + // [RuleCategory::BoostEnvironments].into() + // ))) + // ); + + // assert_eq!( + // outcome_from_parts( + // ClientReportField::FilteredSampling, + // "Sampled:1001,1456,1567,3333,4444" + // ), + // Ok(Outcome::FilteredSampling(RuleCategories( + // [ + // RuleCategory::BoostEnvironments, + // RuleCategory::BoostLowVolumeTransactions, + // RuleCategory::BoostLatestReleases, + // RuleCategory::Custom + // ] + // .into() + // ))) + // ); + // } + + // #[test] + // fn test_from_outcome_type_filtered() { + // assert!(matches!( + // outcome_from_parts(ClientReportField::Filtered, "error-message"), + // Ok(Outcome::Filtered(FilterStatKey::ErrorMessage)) + // )); + + // assert!(matches!( + // outcome_from_parts(ClientReportField::Filtered, "hydration-error"), + // Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_))) + // )); + // } + + // #[test] + // fn test_from_outcome_type_client_discard() { + // assert_eq!( + // outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(), + // Outcome::ClientDiscard("foo_reason".into()) + // ); + // } + + // #[test] + // fn test_from_outcome_type_rate_limited() { + // assert!(matches!( + // outcome_from_parts(ClientReportField::RateLimited, ""), + // Ok(Outcome::RateLimited(None)) + // )); + // assert_eq!( + // outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(), + // Outcome::RateLimited(Some(ReasonCode::new("foo_reason"))) + // ); + // } #[test] fn test_trim_whitespaces() { From 5c1fb4bd429de57e874a1608f6fcad550d6a8672 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Wed, 5 Nov 2025 14:02:33 +0100 Subject: [PATCH 02/15] move tests --- .../src/processing/client_reports/process.rs | 109 +++++++++++++++++- relay-server/src/services/processor/report.rs | 103 +---------------- 2 files changed, 109 insertions(+), 103 deletions(-) diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs index 76e791cab50..beac0e49e3c 100644 --- a/relay-server/src/processing/client_reports/process.rs +++ b/relay-server/src/processing/client_reports/process.rs @@ -214,4 +214,111 @@ fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result Date: Fri, 7 Nov 2025 09:41:09 +0100 Subject: [PATCH 03/15] change processor logic to never forward reports, but rather always process or drop them --- .../src/processing/client_reports/mod.rs | 61 +++++------- .../src/processing/client_reports/process.rs | 98 ++++++++----------- relay-server/src/processing/common.rs | 2 - relay-server/src/services/processor.rs | 3 +- relay-server/src/services/processor/report.rs | 66 +------------ 5 files changed, 67 insertions(+), 163 deletions(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 5f3d5a204aa..2e45c3b30f5 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -1,13 +1,12 @@ use std::sync::Arc; use relay_quotas::RateLimits; +use relay_system::Addr; use crate::envelope::{EnvelopeHeaders, Item, ItemType}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; -use crate::processing::{ - self, Context, CountRateLimited, Forward, ForwardContext, Output, QuotaRateLimiter, -}; -use crate::services::outcome::Outcome; +use crate::processing::{self, Context, CountRateLimited, Nothing, Output, QuotaRateLimiter}; +use crate::services::outcome::{Outcome, TrackOutcome}; mod process; @@ -41,18 +40,22 @@ impl From for Error { /// A processor for Client-Reports. pub struct ClientReportsProcessor { limiter: Arc, + aggregator: Addr, } impl ClientReportsProcessor { /// Creates a new [`Self`]. - pub fn new(limiter: Arc) -> Self { - Self { limiter } + pub fn new(limiter: Arc, aggregator: Addr) -> Self { + Self { + limiter, + aggregator, + } } } impl processing::Processor for ClientReportsProcessor { type UnitOfWork = SerializedClientReport; - type Output = ClientReportOutput; + type Output = Nothing; type Error = Error; fn prepare_envelope( @@ -78,43 +81,22 @@ impl processing::Processor for ClientReportsProcessor { mut client_reports: Managed, ctx: Context<'_>, ) -> Result, Rejected> { - // FIXME: Decide if we want to make the TrackedOutcomes the output of this processor. - let outcomes = - process::process_client_reports(&mut client_reports, ctx.config, ctx.project_info); + process::process_client_reports( + &mut client_reports, + ctx.config, + ctx.project_info, + &self.aggregator, + ); - // FIXME: Are there even quotas on a client_report (the old code did check quotas but seems strange) self.limiter .enforce_quotas(&mut client_reports, ctx) .await?; - // FIXME: Looking at the 'old' processing code seems like we might still need to emit some - // metrics here - Ok(Output::just(ClientReportOutput(client_reports))) - } -} - -// FIXME: The correct output might actually be the TrackedOutcomes that we generate -/// Output produced by the [`ClientReportsProcessor`]. -#[derive(Debug)] -pub struct ClientReportOutput(Managed); - -impl Forward for ClientReportOutput { - fn serialize_envelope( - self, - ctx: ForwardContext<'_>, - ) -> Result>, Rejected<()>> { - // FIXME: Understand what should happen here - todo!() - } - - #[cfg(feature = "processing")] - fn forward_store( - self, - s: &relay_system::Addr, - ctx: ForwardContext<'_>, - ) -> Result<(), Rejected<()>> { - // FIXME: Understand what should happen here - todo!() + // FIXME: Not sure if we want to emit some metrics here still + Ok(Output { + main: None, + metrics: None, + }) } } @@ -132,6 +114,7 @@ pub struct SerializedClientReport { impl Counted for SerializedClientReport { fn quantities(&self) -> Quantities { + // TODO: Check the envelope rete_limiter smallvec::smallvec![] } } diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs index beac0e49e3c..b445ef7c4df 100644 --- a/relay-server/src/processing/client_reports/process.rs +++ b/relay-server/src/processing/client_reports/process.rs @@ -9,6 +9,7 @@ use relay_event_schema::protocol::ClientReport; use relay_filter::FilterStatKey; use relay_quotas::ReasonCode; use relay_sampling::evaluation::MatchedRuleIds; +use relay_system::Addr; use crate::constants::DEFAULT_EVENT_RETENTION; use crate::managed::Managed; @@ -17,8 +18,6 @@ use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome}; use crate::services::processor::MINIMUM_CLOCK_DRIFT; use crate::services::projects::project::ProjectInfo; -use crate::processing::client_reports; - /// Fields of client reports that map to specific [`Outcome`]s without content. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum ClientReportField { @@ -44,16 +43,13 @@ pub fn process_client_reports( client_reports: &mut Managed, config: &Config, project_info: &ProjectInfo, -) -> Vec { - // if client outcomes are disabled we leave the client reports unprocessed - // and pass them on. - if !config.emit_outcomes().any() || !config.emit_client_outcomes() { - // if a processing relay has client outcomes disabled we drop them. - if config.processing_enabled() { - // FIXME: Understand how to best drop them here silently - todo!("Drop all the items silently"); - } - return vec![]; + outcome_aggregator: &Addr, +) { + if (!config.emit_outcomes().any() || !config.emit_client_outcomes()) + && config.processing_enabled() + { + // if a processing relay has client outcomes disabled we drop them without processing. + return; } let mut timestamp = None; @@ -64,21 +60,18 @@ pub fn process_client_reports( ClockDriftProcessor::new(client_reports.headers.sent_at(), received) .at_least(MINIMUM_CLOCK_DRIFT); - // we're going through all client reports but we're effectively just merging - // them into the first one. - client_reports.retain( - |client_reports| &mut client_reports.client_reports, - |item, _| { - match ClientReport::parse(&item.payload()) { - Ok(ClientReport { - timestamp: report_timestamp, - discarded_events, - rate_limited_events, - filtered_events, - filtered_sampling_events, - }) => { - // Glue all discarded events together and give them the appropriate outcome type - let input_events = discarded_events + client_reports.client_reports.iter().for_each(|item| { + match ClientReport::parse(&item.payload()) { + Ok(ClientReport { + timestamp: report_timestamp, + discarded_events, + rate_limited_events, + filtered_events, + filtered_sampling_events, + }) => { + // Glue all discarded events together and give them the appropriate outcome type + let input_events = + discarded_events .into_iter() .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) .chain( @@ -93,34 +86,31 @@ pub fn process_client_reports( (ClientReportField::RateLimited, discarded_event) })); - for (outcome_type, discarded_event) in input_events { - if discarded_event.reason.len() > 200 { - relay_log::trace!("ignored client outcome with an overlong reason"); - continue; - } - *output_events - .entry(( - outcome_type, - discarded_event.reason, - discarded_event.category, - )) - .or_insert(0) += discarded_event.quantity; - } - if let Some(ts) = report_timestamp { - timestamp.get_or_insert(ts); + for (outcome_type, discarded_event) in input_events { + if discarded_event.reason.len() > 200 { + relay_log::trace!("ignored client outcome with an overlong reason"); + continue; } + *output_events + .entry(( + outcome_type, + discarded_event.reason, + discarded_event.category, + )) + .or_insert(0) += discarded_event.quantity; } - Err(err) => { - relay_log::trace!(error = &err as &dyn Error, "invalid client report received") + if let Some(ts) = report_timestamp { + timestamp.get_or_insert(ts); } } - // FIXME: Understand what the equivalent of just dropping them here silently would be. - Ok::<_, client_reports::Error>(()) - }, - ); + Err(err) => { + relay_log::trace!(error = &err as &dyn Error, "invalid client report received") + } + } + }); if output_events.is_empty() { - return vec![]; + return; } let timestamp = @@ -146,7 +136,7 @@ pub fn process_client_reports( "skipping client outcomes older than {} days", max_age.num_days() ); - return vec![]; + return; } let max_future = SignedDuration::seconds(config.max_secs_in_future()); @@ -160,11 +150,9 @@ pub fn process_client_reports( "skipping client outcomes more than {}s in the future", max_future.num_seconds() ); - return vec![]; + return; } - // FIXME: This can be done more elegantly - let mut outcome_collection: Vec = vec![]; for ((outcome_type, reason, category), quantity) in output_events.into_iter() { let outcome = match outcome_from_parts(outcome_type, &reason) { Ok(outcome) => outcome, @@ -174,7 +162,7 @@ pub fn process_client_reports( } }; - outcome_collection.push(TrackOutcome { + outcome_aggregator.send(TrackOutcome { // If we get to this point, the unwrap should not be used anymore, since we know by // now that the timestamp can be parsed, but just incase we fallback to UTC current // `DateTime`. @@ -187,8 +175,6 @@ pub fn process_client_reports( quantity, }); } - - outcome_collection } /// Parse an outcome from an outcome ID and a reason string. diff --git a/relay-server/src/processing/common.rs b/relay-server/src/processing/common.rs index 1e62ee2751d..868346a39b0 100644 --- a/relay-server/src/processing/common.rs +++ b/relay-server/src/processing/common.rs @@ -2,7 +2,6 @@ use crate::Envelope; use crate::managed::{Managed, Rejected}; use crate::processing::ForwardContext; use crate::processing::check_ins::CheckInsProcessor; -use crate::processing::client_reports::ClientReportsProcessor; use crate::processing::logs::LogsProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; @@ -58,5 +57,4 @@ outputs!( TraceMetrics => TraceMetricsProcessor, Spans => SpansProcessor, Sessions => SessionsProcessor, - ClientReports => ClientReportsProcessor, ); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 41258cfcedf..400731d130b 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1205,6 +1205,7 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] let rate_limiter = rate_limiter.map(Arc::new); + let outcome_aggregator = addrs.outcome_aggregator.clone(); let inner = InnerProcessor { pool, global_config, @@ -1234,7 +1235,7 @@ impl EnvelopeProcessorService { spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)), sessions: SessionsProcessor::new(Arc::clone("a_limiter)), - client_report: ClientReportsProcessor::new(quota_limiter), + client_report: ClientReportsProcessor::new(quota_limiter, outcome_aggregator), }, geoip_lookup, config, diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index 691e2ef86ed..6459e4de409 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -69,7 +69,7 @@ mod tests { use crate::envelope::{Envelope, Item}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; - use crate::processing; + use crate::processing::{self}; use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit}; use crate::testutils::create_test_processor; @@ -135,70 +135,6 @@ mod tests { assert!(envelope.is_none()); } - #[tokio::test] - async fn test_client_report_forwarding() { - relay_test::setup(); - let outcome_aggregator = Addr::dummy(); - - let config = Config::from_json_value(serde_json::json!({ - "outcomes": { - "emit_outcomes": false, - // a relay need to emit outcomes at all to not process. - "emit_client_outcomes": true - } - })) - .unwrap(); - - let processor = create_test_processor(Default::default()).await; - - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" - .parse() - .unwrap(); - - let request_meta = RequestMeta::new(dsn); - let mut envelope = Envelope::from_request(None, request_meta); - - envelope.add_item({ - let mut item = Item::new(ItemType::ClientReport); - item.set_payload( - ContentType::Json, - r#" - { - "discarded_events": [ - ["queue_full", "error", 42] - ] - } - "#, - ); - item - }); - - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - let (group, envelope) = envelopes.pop().unwrap(); - let envelope = ManagedEnvelope::new(envelope, outcome_aggregator); - - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx: processing::Context { - config: &config, - ..processing::Context::for_test() - }, - reservoir_counters: &ReservoirCounters::default(), - }; - - let Ok(Some(Submit::Envelope(new_envelope))) = - processor.process(&mut Token::noop(), message).await - else { - panic!(); - }; - let item = new_envelope.envelope().items().next().unwrap(); - assert_eq!(item.ty(), &ItemType::ClientReport); - - new_envelope.accept(); // do not try to capture or emit outcomes - } - #[tokio::test] #[cfg(feature = "processing")] async fn test_client_report_removal_in_processing() { From 673958ef2bb8b1a9fe95667b2d2372f5ae7a45e7 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 7 Nov 2025 10:36:57 +0100 Subject: [PATCH 04/15] clean up code after checking rate limiting and metric logic --- relay-server/src/processing/client_reports/mod.rs | 7 +------ relay-server/src/processing/mod.rs | 8 ++++++++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 2e45c3b30f5..2f8c1fd4438 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -92,11 +92,7 @@ impl processing::Processor for ClientReportsProcessor { .enforce_quotas(&mut client_reports, ctx) .await?; - // FIXME: Not sure if we want to emit some metrics here still - Ok(Output { - main: None, - metrics: None, - }) + Ok(Output::empty()) } } @@ -114,7 +110,6 @@ pub struct SerializedClientReport { impl Counted for SerializedClientReport { fn quantities(&self) -> Quantities { - // TODO: Check the envelope rete_limiter smallvec::smallvec![] } } diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index 0db763179eb..e4dcb53a6aa 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -156,6 +156,14 @@ impl Output { } } + /// Creates an new output with neither main nor metrics. + pub fn empty() -> Self { + Self { + main: None, + metrics: None, + } + } + /// Maps an `Output` to `Output` by applying a function to [`Self::main`]. pub fn map(self, f: F) -> Output where From 709bdaa024736b98a7b3fac15130ea50a093b5bc Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 7 Nov 2025 11:18:11 +0100 Subject: [PATCH 05/15] small code cleanup --- .../src/processing/client_reports/process.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs index b445ef7c4df..283f3c09c9d 100644 --- a/relay-server/src/processing/client_reports/process.rs +++ b/relay-server/src/processing/client_reports/process.rs @@ -60,7 +60,7 @@ pub fn process_client_reports( ClockDriftProcessor::new(client_reports.headers.sent_at(), received) .at_least(MINIMUM_CLOCK_DRIFT); - client_reports.client_reports.iter().for_each(|item| { + for item in &client_reports.client_reports { match ClientReport::parse(&item.payload()) { Ok(ClientReport { timestamp: report_timestamp, @@ -107,7 +107,7 @@ pub fn process_client_reports( relay_log::trace!(error = &err as &dyn Error, "invalid client report received") } } - }); + } if output_events.is_empty() { return; @@ -154,12 +154,9 @@ pub fn process_client_reports( } for ((outcome_type, reason, category), quantity) in output_events.into_iter() { - let outcome = match outcome_from_parts(outcome_type, &reason) { - Ok(outcome) => outcome, - Err(_) => { - relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); - continue; - } + let Ok(outcome) = outcome_from_parts(outcome_type, &reason) else { + relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); + continue; }; outcome_aggregator.send(TrackOutcome { From f8d85afc3cc3cdbd4972ba58c9722e354b6968c7 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 10 Nov 2025 09:58:49 +0100 Subject: [PATCH 06/15] remove redundant ratelimiting logic --- .../src/processing/client_reports/mod.rs | 52 +++---------------- relay-server/src/services/processor.rs | 2 +- 2 files changed, 8 insertions(+), 46 deletions(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 2f8c1fd4438..46cbe2d5306 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -1,55 +1,25 @@ -use std::sync::Arc; - -use relay_quotas::RateLimits; use relay_system::Addr; use crate::envelope::{EnvelopeHeaders, Item, ItemType}; -use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; -use crate::processing::{self, Context, CountRateLimited, Nothing, Output, QuotaRateLimiter}; -use crate::services::outcome::{Outcome, TrackOutcome}; +use crate::managed::{Counted, Managed, ManagedEnvelope, Quantities, Rejected}; +use crate::processing::{self, Context, Nothing, Output}; +use crate::services::outcome::TrackOutcome; mod process; +// TODO: Not sure there is a cleaner way to do this. #[derive(Debug, thiserror::Error)] -pub enum Error { - /// The client-reports are rate limited. - #[error("rate limited")] - RateLimited(RateLimits), -} - -impl OutcomeError for Error { - type Error = Self; - - fn consume(self) -> (Option, Self::Error) { - let outcome = match &self { - Self::RateLimited(limits) => { - let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); - Some(Outcome::RateLimited(reason_code)) - } - }; - (outcome, self) - } -} - -impl From for Error { - fn from(value: RateLimits) -> Self { - Self::RateLimited(value) - } -} +pub enum Error {} /// A processor for Client-Reports. pub struct ClientReportsProcessor { - limiter: Arc, aggregator: Addr, } impl ClientReportsProcessor { /// Creates a new [`Self`]. - pub fn new(limiter: Arc, aggregator: Addr) -> Self { - Self { - limiter, - aggregator, - } + pub fn new(aggregator: Addr) -> Self { + Self { aggregator } } } @@ -88,10 +58,6 @@ impl processing::Processor for ClientReportsProcessor { &self.aggregator, ); - self.limiter - .enforce_quotas(&mut client_reports, ctx) - .await?; - Ok(Output::empty()) } } @@ -113,7 +79,3 @@ impl Counted for SerializedClientReport { smallvec::smallvec![] } } - -impl CountRateLimited for Managed { - type Error = Error; -} diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 98a4d684507..4afb63bd28c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1236,7 +1236,7 @@ impl EnvelopeProcessorService { spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)), sessions: SessionsProcessor::new(Arc::clone("a_limiter)), - client_report: ClientReportsProcessor::new(quota_limiter, outcome_aggregator), + client_report: ClientReportsProcessor::new(outcome_aggregator), }, geoip_lookup, config, From 15358812f9595ef6796298c8a668e9e08e859ad9 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 10 Nov 2025 11:17:10 +0100 Subject: [PATCH 07/15] rename `SerializedClientReport` to `SerializedClientReports` --- relay-server/src/processing/client_reports/mod.rs | 8 ++++---- relay-server/src/processing/client_reports/process.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 46cbe2d5306..7bc30cc5107 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -24,7 +24,7 @@ impl ClientReportsProcessor { } impl processing::Processor for ClientReportsProcessor { - type UnitOfWork = SerializedClientReport; + type UnitOfWork = SerializedClientReports; type Output = Nothing; type Error = Error; @@ -39,7 +39,7 @@ impl processing::Processor for ClientReportsProcessor { .take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport)) .into_vec(); - let work = SerializedClientReport { + let work = SerializedClientReports { headers, client_reports, }; @@ -64,7 +64,7 @@ impl processing::Processor for ClientReportsProcessor { /// Client-Reports in their serialized state, as transported in an envelope. #[derive(Debug)] -pub struct SerializedClientReport { +pub struct SerializedClientReports { /// Original envelope headers. headers: EnvelopeHeaders, @@ -74,7 +74,7 @@ pub struct SerializedClientReport { client_reports: Vec, } -impl Counted for SerializedClientReport { +impl Counted for SerializedClientReports { fn quantities(&self) -> Quantities { smallvec::smallvec![] } diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs index 283f3c09c9d..e88f29d325e 100644 --- a/relay-server/src/processing/client_reports/process.rs +++ b/relay-server/src/processing/client_reports/process.rs @@ -13,7 +13,7 @@ use relay_system::Addr; use crate::constants::DEFAULT_EVENT_RETENTION; use crate::managed::Managed; -use crate::processing::client_reports::SerializedClientReport; +use crate::processing::client_reports::SerializedClientReports; use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome}; use crate::services::processor::MINIMUM_CLOCK_DRIFT; use crate::services::projects::project::ProjectInfo; @@ -40,7 +40,7 @@ pub enum ClientReportField { /// client SDKs. The outcomes are removed here and sent directly to the outcomes /// system. pub fn process_client_reports( - client_reports: &mut Managed, + client_reports: &mut Managed, config: &Config, project_info: &ProjectInfo, outcome_aggregator: &Addr, From 12316beadd4e73f3d80d365e20b4f7ba169ec75c Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 14 Nov 2025 11:03:31 +0100 Subject: [PATCH 08/15] breaking: remove emit_client_outcomes --- relay-config/src/config.rs | 16 ---------------- .../src/processing/client_reports/process.rs | 4 +--- relay-server/src/services/processor/report.rs | 2 -- tests/integration/test_outcome.py | 1 - 4 files changed, 1 insertion(+), 22 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 309ae94a5b2..dd8ddaf9409 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -1346,8 +1346,6 @@ pub struct Outcomes { /// Processing relays always emit outcomes (for backwards compatibility). /// Can take the following values: false, "as_client_reports", true pub emit_outcomes: EmitOutcomes, - /// Controls wheather client reported outcomes should be emitted. - pub emit_client_outcomes: bool, /// The maximum number of outcomes that are batched before being sent /// via http to the upstream (only applies to non processing relays). pub batch_size: usize, @@ -1365,7 +1363,6 @@ impl Default for Outcomes { fn default() -> Self { Outcomes { emit_outcomes: EmitOutcomes::AsClientReports, - emit_client_outcomes: true, batch_size: 1000, batch_interval: 500, source: None, @@ -2039,19 +2036,6 @@ impl Config { self.values.outcomes.emit_outcomes } - /// Returns whether this Relay should emit client outcomes - /// - /// Relays that do not emit client outcomes will forward client recieved outcomes - /// directly to the next relay in the chain as client report envelope. This is only done - /// if this relay emits outcomes at all. A relay that will not emit outcomes - /// will forward the envelope unchanged. - /// - /// This flag can be explicitly disabled on processing relays as well to prevent the - /// emitting of client outcomes to the kafka topic. - pub fn emit_client_outcomes(&self) -> bool { - self.values.outcomes.emit_client_outcomes - } - /// Returns the maximum number of outcomes that are batched before being sent pub fn outcome_batch_size(&self) -> usize { self.values.outcomes.batch_size diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs index e88f29d325e..00014898caf 100644 --- a/relay-server/src/processing/client_reports/process.rs +++ b/relay-server/src/processing/client_reports/process.rs @@ -45,9 +45,7 @@ pub fn process_client_reports( project_info: &ProjectInfo, outcome_aggregator: &Addr, ) { - if (!config.emit_outcomes().any() || !config.emit_client_outcomes()) - && config.processing_enabled() - { + if !config.emit_outcomes().any() && config.processing_enabled() { // if a processing relay has client outcomes disabled we drop them without processing. return; } diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index 6459e4de409..1fa20053c60 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -84,7 +84,6 @@ mod tests { let config = Config::from_json_value(serde_json::json!({ "outcomes": { "emit_outcomes": true, - "emit_client_outcomes": true } })) .unwrap(); @@ -144,7 +143,6 @@ mod tests { let config = Config::from_json_value(serde_json::json!({ "outcomes": { "emit_outcomes": true, - "emit_client_outcomes": false, }, "processing": { "enabled": true, diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 5fd6ab3ec8c..1e6b66d6c8c 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -841,7 +841,6 @@ def test_outcome_to_client_report(relay, mini_sentry): { "outcomes": { "emit_outcomes": True, - "emit_client_outcomes": True, "batch_size": 1, "batch_interval": 1, } From 843a1c60f46878ea15d20afcf3d690645b93cc81 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 14 Nov 2025 15:25:29 +0100 Subject: [PATCH 09/15] split processing logic into: expand, validate and emit --- .../src/processing/client_reports/mod.rs | 107 ++++++++- .../src/processing/client_reports/process.rs | 217 ++++++++++-------- 2 files changed, 218 insertions(+), 106 deletions(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 7bc30cc5107..2dfc02fcc42 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -1,15 +1,41 @@ +use relay_metrics::UnixTimestamp; +use relay_quotas::DataCategory; use relay_system::Addr; use crate::envelope::{EnvelopeHeaders, Item, ItemType}; -use crate::managed::{Counted, Managed, ManagedEnvelope, Quantities, Rejected}; +use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; use crate::processing::{self, Context, Nothing, Output}; use crate::services::outcome::TrackOutcome; mod process; -// TODO: Not sure there is a cleaner way to do this. #[derive(Debug, thiserror::Error)] -pub enum Error {} +pub enum Error { + /// Client report parsing failed. + #[error("invalid client report")] + InvalidClientReport, + /// Processing relay has client outcomes disabled. + #[error("client outcomes disabled")] + OutcomesDisabled, + /// Client report timestamp is too old. + #[error("client report timestamp too old")] + TimestampTooOld, + /// Client report timestamp is too far in the future. + #[error("client report timestamp in future")] + TimestampInFuture, + /// Client report contains an invalid outcome combination. + #[error("invalid outcome combination")] + InvalidOutcome, +} + +impl OutcomeError for Error { + type Error = Self; + + fn consume(self) -> (Option, Self::Error) { + // Currently/historically client reports do not emit outcomes. + (None, self) + } +} /// A processor for Client-Reports. pub struct ClientReportsProcessor { @@ -48,15 +74,21 @@ impl processing::Processor for ClientReportsProcessor { async fn process( &self, - mut client_reports: Managed, + client_reports: Managed, ctx: Context<'_>, ) -> Result, Rejected> { - process::process_client_reports( - &mut client_reports, - ctx.config, - ctx.project_info, - &self.aggregator, - ); + if !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() { + // if a processing relay has client outcomes disabled we drop them without processing. + return Err(client_reports.reject_err(Error::OutcomesDisabled)); + } + + let client_reports = process::expand(client_reports); + + // FIXME: This guard is taken over from the old code not sure if we still need this. + if !client_reports.output_events.is_empty() { + let client_reports = process::validate(client_reports, ctx.config, ctx.project_info)?; + process::emit(client_reports, &self.aggregator); + } Ok(Output::empty()) } @@ -67,7 +99,6 @@ impl processing::Processor for ClientReportsProcessor { pub struct SerializedClientReports { /// Original envelope headers. headers: EnvelopeHeaders, - /// A list of client-reports waiting to be processed. /// /// All items contained here must be client-reports. @@ -79,3 +110,57 @@ impl Counted for SerializedClientReports { smallvec::smallvec![] } } + +/// Client reports which have been parsed and aggregated from their serialized state. +pub struct ExpandedClientReports { + /// Original envelope headers. + headers: EnvelopeHeaders, + /// The timestamp of when the reports were created. + timestamp: Option, + /// Aggregated outcome events from all parsed client reports. + output_events: Vec, +} + +impl Counted for ExpandedClientReports { + fn quantities(&self) -> crate::managed::Quantities { + smallvec::smallvec![] + } +} + +/// A aggregated single outcome event. +pub struct OutcomeEvent { + outcome_type: ClientReportField, + reason: String, + category: DataCategory, + quantity: u32, +} + +impl Counted for OutcomeEvent { + fn quantities(&self) -> crate::managed::Quantities { + smallvec::smallvec![] + } +} + +/// Client reports which have been validated and converted to trackable outcomes. +pub struct ValidatedClientReports { + outcomes: Vec, +} + +impl Counted for ValidatedClientReports { + fn quantities(&self) -> crate::managed::Quantities { + smallvec::smallvec![] + } +} + +/// Fields of client reports that map to specific [`Outcome`]s without content. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ClientReportField { + /// The event has been filtered by an inbound data filter. + Filtered, + /// The event has been filtered by a sampling rule. + FilteredSampling, + /// The event has been rate limited. + RateLimited, + /// The event has already been discarded on the client side. + ClientDiscard, +} diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs index 00014898caf..dec57000593 100644 --- a/relay-server/src/processing/client_reports/process.rs +++ b/relay-server/src/processing/client_reports/process.rs @@ -1,5 +1,4 @@ use std::collections::BTreeMap; -use std::error::Error; use chrono::{Duration as SignedDuration, Utc}; use relay_common::time::UnixTimestamp; @@ -12,64 +11,35 @@ use relay_sampling::evaluation::MatchedRuleIds; use relay_system::Addr; use crate::constants::DEFAULT_EVENT_RETENTION; -use crate::managed::Managed; -use crate::processing::client_reports::SerializedClientReports; +use crate::managed::{Managed, Rejected}; +use crate::processing::client_reports::{ + ClientReportField, Error, ExpandedClientReports, OutcomeEvent, SerializedClientReports, + ValidatedClientReports, +}; use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome}; use crate::services::processor::MINIMUM_CLOCK_DRIFT; use crate::services::projects::project::ProjectInfo; -/// Fields of client reports that map to specific [`Outcome`]s without content. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum ClientReportField { - /// The event has been filtered by an inbound data filter. - Filtered, - - /// The event has been filtered by a sampling rule. - FilteredSampling, - - /// The event has been rate limited. - RateLimited, - - /// The event has already been discarded on the client side. - ClientDiscard, -} - -/// Validates and extracts client reports. +/// Parses and aggregates all client reports in their [`ExpandedClientReports`] representation. /// -/// At the moment client reports are primarily used to transfer outcomes from -/// client SDKs. The outcomes are removed here and sent directly to the outcomes -/// system. -pub fn process_client_reports( - client_reports: &mut Managed, - config: &Config, - project_info: &ProjectInfo, - outcome_aggregator: &Addr, -) { - if !config.emit_outcomes().any() && config.processing_enabled() { - // if a processing relay has client outcomes disabled we drop them without processing. - return; - } - - let mut timestamp = None; - let mut output_events = BTreeMap::new(); - let received = client_reports.received_at(); - - let clock_drift_processor = - ClockDriftProcessor::new(client_reports.headers.sent_at(), received) - .at_least(MINIMUM_CLOCK_DRIFT); - - for item in &client_reports.client_reports { - match ClientReport::parse(&item.payload()) { - Ok(ClientReport { - timestamp: report_timestamp, - discarded_events, - rate_limited_events, - filtered_events, - filtered_sampling_events, - }) => { - // Glue all discarded events together and give them the appropriate outcome type - let input_events = - discarded_events +/// Invalid reports will be discarded. +/// Events with overlong reasons will be skipped. +pub fn expand(client_reports: Managed) -> Managed { + client_reports.map(|client_reports, records| { + let mut timestamp = None; + let mut output_events = BTreeMap::new(); + + for item in client_reports.client_reports { + match ClientReport::parse(&item.payload()) { + Ok(ClientReport { + timestamp: report_timestamp, + discarded_events, + rate_limited_events, + filtered_events, + filtered_sampling_events, + }) => { + // Glue all discarded events together and give them the appropriate outcome type + let input_events = discarded_events .into_iter() .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) .chain( @@ -84,33 +54,65 @@ pub fn process_client_reports( (ClientReportField::RateLimited, discarded_event) })); - for (outcome_type, discarded_event) in input_events { - if discarded_event.reason.len() > 200 { - relay_log::trace!("ignored client outcome with an overlong reason"); - continue; + for (outcome_type, discarded_event) in input_events { + if discarded_event.reason.len() > 200 { + relay_log::trace!("ignored client outcome with an overlong reason"); + // TODO: Not sure if we want to reject the individual 'discarded_event' + // here. Rejecting the entire item also seems wrong. + continue; + } + *output_events + .entry(( + outcome_type, + discarded_event.reason, + discarded_event.category, + )) + .or_insert(0) += discarded_event.quantity; + } + if let Some(ts) = report_timestamp { + timestamp.get_or_insert(ts); } - *output_events - .entry(( - outcome_type, - discarded_event.reason, - discarded_event.category, - )) - .or_insert(0) += discarded_event.quantity; } - if let Some(ts) = report_timestamp { - timestamp.get_or_insert(ts); + Err(err) => { + relay_log::trace!( + error = &err as &dyn std::error::Error, + "invalid client report received" + ); + records.reject_err(Error::InvalidClientReport, item); } } - Err(err) => { - relay_log::trace!(error = &err as &dyn Error, "invalid client report received") - } } - } - if output_events.is_empty() { - return; - } + ExpandedClientReports { + headers: client_reports.headers, + timestamp, + output_events: output_events + .into_iter() + .map( + |((outcome_type, reason, category), quantity)| OutcomeEvent { + outcome_type, + reason, + category, + quantity, + }, + ) + .collect(), + } + }) +} +/// Validates and extracts client reports to their [`ValidatedClientReports`] representation. +pub fn validate( + client_reports: Managed, + config: &Config, + project_info: &ProjectInfo, +) -> Result, Rejected> { + let received = client_reports.received_at(); + let clock_drift_processor = + ClockDriftProcessor::new(client_reports.headers.sent_at(), received) + .at_least(MINIMUM_CLOCK_DRIFT); + + let mut timestamp = client_reports.timestamp; let timestamp = timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); @@ -134,7 +136,7 @@ pub fn process_client_reports( "skipping client outcomes older than {} days", max_age.num_days() ); - return; + return Err(client_reports.reject_err(Error::TimestampTooOld)); } let max_future = SignedDuration::seconds(config.max_secs_in_future()); @@ -148,28 +150,53 @@ pub fn process_client_reports( "skipping client outcomes more than {}s in the future", max_future.num_seconds() ); - return; + return Err(client_reports.reject_err(Error::TimestampInFuture)); } - for ((outcome_type, reason, category), quantity) in output_events.into_iter() { - let Ok(outcome) = outcome_from_parts(outcome_type, &reason) else { - relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); - continue; - }; - - outcome_aggregator.send(TrackOutcome { - // If we get to this point, the unwrap should not be used anymore, since we know by - // now that the timestamp can be parsed, but just incase we fallback to UTC current - // `DateTime`. - timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), - scoping: client_reports.scoping(), - outcome, - event_id: None, - remote_addr: None, // omitting the client address allows for better aggregation - category, - quantity, - }); - } + let scoping = client_reports.scoping(); + Ok(client_reports.map(|client_reports, records| { + let mut outcomes = Vec::new(); + for event_outcome in client_reports.output_events { + let OutcomeEvent { + outcome_type, + ref reason, + category, + quantity, + } = event_outcome; + + let Ok(outcome) = outcome_from_parts(outcome_type, reason) else { + relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); + records.reject_err(Error::InvalidOutcome, event_outcome); + continue; + }; + outcomes.push(TrackOutcome { + // If we get to this point, the unwrap should not be used anymore, since we know by + // now that the timestamp can be parsed, but just incase we fallback to UTC current + // `DateTime`. + timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), + scoping, + outcome, + event_id: None, + remote_addr: None, // omitting the client address allows for better aggregation + category, + quantity, + }); + } + + ValidatedClientReports { outcomes } + })) +} + +/// Emits the validated client report outcomes to the outcome aggregator. +pub fn emit( + client_reports: Managed, + outcome_aggregator: &Addr, +) { + client_reports.accept(|client_reports| { + for outcome in client_reports.outcomes { + outcome_aggregator.send(outcome) + } + }) } /// Parse an outcome from an outcome ID and a reason string. From c76a024fbe75bddf420e140cfa564b0cb74f7c3c Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 14 Nov 2025 15:40:43 +0100 Subject: [PATCH 10/15] fix merge --- relay-server/src/services/processor/report.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index 84a352c69f5..cb018f64291 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -63,6 +63,7 @@ mod tests { use relay_cogs::Token; use relay_config::Config; use relay_event_schema::protocol::EventId; + use relay_system::Addr; use crate::envelope::{Envelope, Item}; use crate::extractors::RequestMeta; From bc85629eadf2b293b0b2bc6476109024fa131e7e Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 14 Nov 2025 15:51:45 +0100 Subject: [PATCH 11/15] fix doc lint --- relay-server/src/processing/client_reports/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 2dfc02fcc42..1a262b5a732 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -5,7 +5,7 @@ use relay_system::Addr; use crate::envelope::{EnvelopeHeaders, Item, ItemType}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; use crate::processing::{self, Context, Nothing, Output}; -use crate::services::outcome::TrackOutcome; +use crate::services::outcome::{Outcome, TrackOutcome}; mod process; @@ -31,7 +31,7 @@ pub enum Error { impl OutcomeError for Error { type Error = Self; - fn consume(self) -> (Option, Self::Error) { + fn consume(self) -> (Option, Self::Error) { // Currently/historically client reports do not emit outcomes. (None, self) } From b9fdd3e0d18ee01396e17be76fb463c9ba108f9d Mon Sep 17 00:00:00 2001 From: Tobias Wilfert <36408720+tobias-wilfert@users.noreply.github.com> Date: Mon, 24 Nov 2025 12:51:09 +0100 Subject: [PATCH 12/15] Update relay-server/src/processing/client_reports/mod.rs Co-authored-by: Joris Bayer --- relay-server/src/processing/client_reports/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 1a262b5a732..4d565b1578e 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -32,7 +32,7 @@ impl OutcomeError for Error { type Error = Self; fn consume(self) -> (Option, Self::Error) { - // Currently/historically client reports do not emit outcomes. + // Client reports are outcomes, and we do not emit outcomes for outcomes. (None, self) } } From 089341c384c906dff0bfd2c4d7b928e14b995fd0 Mon Sep 17 00:00:00 2001 From: Tobias Wilfert <36408720+tobias-wilfert@users.noreply.github.com> Date: Mon, 24 Nov 2025 12:51:21 +0100 Subject: [PATCH 13/15] Update relay-server/src/processing/client_reports/mod.rs Co-authored-by: Joris Bayer --- relay-server/src/processing/client_reports/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 4d565b1578e..d7bf63a7132 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -37,7 +37,7 @@ impl OutcomeError for Error { } } -/// A processor for Client-Reports. +/// A processor for client reports. pub struct ClientReportsProcessor { aggregator: Addr, } From 59c829864264c091f60f7e2bb6ee2e6b26ab762e Mon Sep 17 00:00:00 2001 From: Tobias Wilfert <36408720+tobias-wilfert@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:24:16 +0100 Subject: [PATCH 14/15] Update relay-server/src/processing/client_reports/mod.rs Co-authored-by: Joris Bayer --- relay-server/src/processing/client_reports/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index d7bf63a7132..5cc1e14882d 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -122,7 +122,7 @@ pub struct ExpandedClientReports { } impl Counted for ExpandedClientReports { - fn quantities(&self) -> crate::managed::Quantities { + fn quantities(&self) -> Quantities { smallvec::smallvec![] } } From af3ee0bfac10df4d9667e2c05bbe5f8a2d61dc22 Mon Sep 17 00:00:00 2001 From: Tobias Wilfert <36408720+tobias-wilfert@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:24:59 +0100 Subject: [PATCH 15/15] Update relay-server/src/processing/client_reports/process.rs Co-authored-by: Joris Bayer --- relay-server/src/processing/client_reports/process.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs index dec57000593..017a4db7364 100644 --- a/relay-server/src/processing/client_reports/process.rs +++ b/relay-server/src/processing/client_reports/process.rs @@ -199,9 +199,6 @@ pub fn emit( }) } -/// Parse an outcome from an outcome ID and a reason string. -/// -/// Currently only used to reconstruct outcomes encoded in client reports. fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result { match field { ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {