diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 0ff08bf3c2a..4d3e930b57d 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -1366,8 +1366,6 @@ pub struct Outcomes { /// Processing relays always emit outcomes (for backwards compatibility). /// Can take the following values: false, "as_client_reports", true pub emit_outcomes: EmitOutcomes, - /// Controls wheather client reported outcomes should be emitted. - pub emit_client_outcomes: bool, /// The maximum number of outcomes that are batched before being sent /// via http to the upstream (only applies to non processing relays). pub batch_size: usize, @@ -1385,7 +1383,6 @@ impl Default for Outcomes { fn default() -> Self { Outcomes { emit_outcomes: EmitOutcomes::AsClientReports, - emit_client_outcomes: true, batch_size: 1000, batch_interval: 500, source: None, @@ -2078,19 +2075,6 @@ impl Config { self.values.outcomes.emit_outcomes } - /// Returns whether this Relay should emit client outcomes - /// - /// Relays that do not emit client outcomes will forward client recieved outcomes - /// directly to the next relay in the chain as client report envelope. This is only done - /// if this relay emits outcomes at all. A relay that will not emit outcomes - /// will forward the envelope unchanged. - /// - /// This flag can be explicitly disabled on processing relays as well to prevent the - /// emitting of client outcomes to the kafka topic. - pub fn emit_client_outcomes(&self) -> bool { - self.values.outcomes.emit_client_outcomes - } - /// Returns the maximum number of outcomes that are batched before being sent pub fn outcome_batch_size(&self) -> usize { self.values.outcomes.batch_size diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs new file mode 100644 index 00000000000..5cc1e14882d --- /dev/null +++ b/relay-server/src/processing/client_reports/mod.rs @@ -0,0 +1,166 @@ +use relay_metrics::UnixTimestamp; +use relay_quotas::DataCategory; +use relay_system::Addr; + +use crate::envelope::{EnvelopeHeaders, Item, ItemType}; +use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; +use crate::processing::{self, Context, Nothing, Output}; +use crate::services::outcome::{Outcome, TrackOutcome}; + +mod process; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Client report parsing failed. + #[error("invalid client report")] + InvalidClientReport, + /// Processing relay has client outcomes disabled. + #[error("client outcomes disabled")] + OutcomesDisabled, + /// Client report timestamp is too old. + #[error("client report timestamp too old")] + TimestampTooOld, + /// Client report timestamp is too far in the future. + #[error("client report timestamp in future")] + TimestampInFuture, + /// Client report contains an invalid outcome combination. + #[error("invalid outcome combination")] + InvalidOutcome, +} + +impl OutcomeError for Error { + type Error = Self; + + fn consume(self) -> (Option, Self::Error) { + // Client reports are outcomes, and we do not emit outcomes for outcomes. + (None, self) + } +} + +/// A processor for client reports. +pub struct ClientReportsProcessor { + aggregator: Addr, +} + +impl ClientReportsProcessor { + /// Creates a new [`Self`]. + pub fn new(aggregator: Addr) -> Self { + Self { aggregator } + } +} + +impl processing::Processor for ClientReportsProcessor { + type UnitOfWork = SerializedClientReports; + type Output = Nothing; + type Error = Error; + + fn prepare_envelope( + &self, + envelope: &mut ManagedEnvelope, + ) -> Option> { + let headers = envelope.envelope().headers().clone(); + + let client_reports = envelope + .envelope_mut() + .take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport)) + .into_vec(); + + let work = SerializedClientReports { + headers, + client_reports, + }; + Some(Managed::from_envelope(envelope, work)) + } + + async fn process( + &self, + client_reports: Managed, + ctx: Context<'_>, + ) -> Result, Rejected> { + if !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() { + // if a processing relay has client outcomes disabled we drop them without processing. + return Err(client_reports.reject_err(Error::OutcomesDisabled)); + } + + let client_reports = process::expand(client_reports); + + // FIXME: This guard is taken over from the old code not sure if we still need this. + if !client_reports.output_events.is_empty() { + let client_reports = process::validate(client_reports, ctx.config, ctx.project_info)?; + process::emit(client_reports, &self.aggregator); + } + + Ok(Output::empty()) + } +} + +/// Client-Reports in their serialized state, as transported in an envelope. +#[derive(Debug)] +pub struct SerializedClientReports { + /// Original envelope headers. + headers: EnvelopeHeaders, + /// A list of client-reports waiting to be processed. + /// + /// All items contained here must be client-reports. + client_reports: Vec, +} + +impl Counted for SerializedClientReports { + fn quantities(&self) -> Quantities { + smallvec::smallvec![] + } +} + +/// Client reports which have been parsed and aggregated from their serialized state. +pub struct ExpandedClientReports { + /// Original envelope headers. + headers: EnvelopeHeaders, + /// The timestamp of when the reports were created. + timestamp: Option, + /// Aggregated outcome events from all parsed client reports. + output_events: Vec, +} + +impl Counted for ExpandedClientReports { + fn quantities(&self) -> Quantities { + smallvec::smallvec![] + } +} + +/// A aggregated single outcome event. +pub struct OutcomeEvent { + outcome_type: ClientReportField, + reason: String, + category: DataCategory, + quantity: u32, +} + +impl Counted for OutcomeEvent { + fn quantities(&self) -> crate::managed::Quantities { + smallvec::smallvec![] + } +} + +/// Client reports which have been validated and converted to trackable outcomes. +pub struct ValidatedClientReports { + outcomes: Vec, +} + +impl Counted for ValidatedClientReports { + fn quantities(&self) -> crate::managed::Quantities { + smallvec::smallvec![] + } +} + +/// Fields of client reports that map to specific [`Outcome`]s without content. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ClientReportField { + /// The event has been filtered by an inbound data filter. + Filtered, + /// The event has been filtered by a sampling rule. + FilteredSampling, + /// The event has been rate limited. + RateLimited, + /// The event has already been discarded on the client side. + ClientDiscard, +} diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs new file mode 100644 index 00000000000..017a4db7364 --- /dev/null +++ b/relay-server/src/processing/client_reports/process.rs @@ -0,0 +1,329 @@ +use std::collections::BTreeMap; + +use chrono::{Duration as SignedDuration, Utc}; +use relay_common::time::UnixTimestamp; +use relay_config::Config; +use relay_event_normalization::ClockDriftProcessor; +use relay_event_schema::protocol::ClientReport; +use relay_filter::FilterStatKey; +use relay_quotas::ReasonCode; +use relay_sampling::evaluation::MatchedRuleIds; +use relay_system::Addr; + +use crate::constants::DEFAULT_EVENT_RETENTION; +use crate::managed::{Managed, Rejected}; +use crate::processing::client_reports::{ + ClientReportField, Error, ExpandedClientReports, OutcomeEvent, SerializedClientReports, + ValidatedClientReports, +}; +use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome}; +use crate::services::processor::MINIMUM_CLOCK_DRIFT; +use crate::services::projects::project::ProjectInfo; + +/// Parses and aggregates all client reports in their [`ExpandedClientReports`] representation. +/// +/// Invalid reports will be discarded. +/// Events with overlong reasons will be skipped. +pub fn expand(client_reports: Managed) -> Managed { + client_reports.map(|client_reports, records| { + let mut timestamp = None; + let mut output_events = BTreeMap::new(); + + for item in client_reports.client_reports { + match ClientReport::parse(&item.payload()) { + Ok(ClientReport { + timestamp: report_timestamp, + discarded_events, + rate_limited_events, + filtered_events, + filtered_sampling_events, + }) => { + // Glue all discarded events together and give them the appropriate outcome type + let input_events = discarded_events + .into_iter() + .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) + .chain( + filtered_events.into_iter().map(|discarded_event| { + (ClientReportField::Filtered, discarded_event) + }), + ) + .chain(filtered_sampling_events.into_iter().map(|discarded_event| { + (ClientReportField::FilteredSampling, discarded_event) + })) + .chain(rate_limited_events.into_iter().map(|discarded_event| { + (ClientReportField::RateLimited, discarded_event) + })); + + for (outcome_type, discarded_event) in input_events { + if discarded_event.reason.len() > 200 { + relay_log::trace!("ignored client outcome with an overlong reason"); + // TODO: Not sure if we want to reject the individual 'discarded_event' + // here. Rejecting the entire item also seems wrong. + continue; + } + *output_events + .entry(( + outcome_type, + discarded_event.reason, + discarded_event.category, + )) + .or_insert(0) += discarded_event.quantity; + } + if let Some(ts) = report_timestamp { + timestamp.get_or_insert(ts); + } + } + Err(err) => { + relay_log::trace!( + error = &err as &dyn std::error::Error, + "invalid client report received" + ); + records.reject_err(Error::InvalidClientReport, item); + } + } + } + + ExpandedClientReports { + headers: client_reports.headers, + timestamp, + output_events: output_events + .into_iter() + .map( + |((outcome_type, reason, category), quantity)| OutcomeEvent { + outcome_type, + reason, + category, + quantity, + }, + ) + .collect(), + } + }) +} + +/// Validates and extracts client reports to their [`ValidatedClientReports`] representation. +pub fn validate( + client_reports: Managed, + config: &Config, + project_info: &ProjectInfo, +) -> Result, Rejected> { + let received = client_reports.received_at(); + let clock_drift_processor = + ClockDriftProcessor::new(client_reports.headers.sent_at(), received) + .at_least(MINIMUM_CLOCK_DRIFT); + + let mut timestamp = client_reports.timestamp; + let timestamp = + timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); + + if clock_drift_processor.is_drifted() { + relay_log::trace!("applying clock drift correction to client report"); + clock_drift_processor.process_timestamp(timestamp); + } + + let retention_days = project_info + .config() + .event_retention + .unwrap_or(DEFAULT_EVENT_RETENTION); + let max_age = SignedDuration::days(retention_days.into()); + // also if we unable to parse the timestamp, we assume it's way too old here. + let in_past = timestamp + .as_datetime() + .map(|ts| (received - ts) > max_age) + .unwrap_or(true); + if in_past { + relay_log::trace!( + "skipping client outcomes older than {} days", + max_age.num_days() + ); + return Err(client_reports.reject_err(Error::TimestampTooOld)); + } + + let max_future = SignedDuration::seconds(config.max_secs_in_future()); + // also if we unable to parse the timestamp, we assume it's way far in the future here. + let in_future = timestamp + .as_datetime() + .map(|ts| (ts - received) > max_future) + .unwrap_or(true); + if in_future { + relay_log::trace!( + "skipping client outcomes more than {}s in the future", + max_future.num_seconds() + ); + return Err(client_reports.reject_err(Error::TimestampInFuture)); + } + + let scoping = client_reports.scoping(); + Ok(client_reports.map(|client_reports, records| { + let mut outcomes = Vec::new(); + for event_outcome in client_reports.output_events { + let OutcomeEvent { + outcome_type, + ref reason, + category, + quantity, + } = event_outcome; + + let Ok(outcome) = outcome_from_parts(outcome_type, reason) else { + relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); + records.reject_err(Error::InvalidOutcome, event_outcome); + continue; + }; + outcomes.push(TrackOutcome { + // If we get to this point, the unwrap should not be used anymore, since we know by + // now that the timestamp can be parsed, but just incase we fallback to UTC current + // `DateTime`. + timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), + scoping, + outcome, + event_id: None, + remote_addr: None, // omitting the client address allows for better aggregation + category, + quantity, + }); + } + + ValidatedClientReports { outcomes } + })) +} + +/// Emits the validated client report outcomes to the outcome aggregator. +pub fn emit( + client_reports: Managed, + outcome_aggregator: &Addr, +) { + client_reports.accept(|client_reports| { + for outcome in client_reports.outcomes { + outcome_aggregator.send(outcome) + } + }) +} + +fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result { + match field { + ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") { + Some(rule_ids) => MatchedRuleIds::parse(rule_ids) + .map(RuleCategories::from) + .map(Outcome::FilteredSampling) + .map_err(|_| ()), + None => Err(()), + }, + ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())), + ClientReportField::Filtered => Ok(Outcome::Filtered( + FilterStatKey::try_from(reason).map_err(|_| ())?, + )), + ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason { + "" => None, + other => Some(ReasonCode::new(other)), + })), + } +} + +#[cfg(test)] +mod tests { + + use crate::services::outcome::RuleCategory; + + use super::*; + + #[test] + fn test_from_outcome_type_sampled() { + assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err()); + + assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err()); + + assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err()); + + assert!(matches!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"), + Err(()) + )); + + assert!(matches!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"), + Err(()) + )); + + assert!(matches!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"), + Err(()) + )); + + assert_eq!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"), + Ok(Outcome::FilteredSampling(RuleCategories( + [RuleCategory::Other].into() + ))) + ); + + assert_eq!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), + Ok(Outcome::FilteredSampling(RuleCategories( + [RuleCategory::Other].into() + ))) + ); + + assert_eq!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), + Ok(Outcome::FilteredSampling(RuleCategories( + [RuleCategory::Other].into() + ))) + ); + + assert_eq!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"), + Ok(Outcome::FilteredSampling(RuleCategories( + [RuleCategory::BoostEnvironments].into() + ))) + ); + + assert_eq!( + outcome_from_parts( + ClientReportField::FilteredSampling, + "Sampled:1001,1456,1567,3333,4444" + ), + Ok(Outcome::FilteredSampling(RuleCategories( + [ + RuleCategory::BoostEnvironments, + RuleCategory::BoostLowVolumeTransactions, + RuleCategory::BoostLatestReleases, + RuleCategory::Custom + ] + .into() + ))) + ); + } + + #[test] + fn test_from_outcome_type_filtered() { + assert!(matches!( + outcome_from_parts(ClientReportField::Filtered, "error-message"), + Ok(Outcome::Filtered(FilterStatKey::ErrorMessage)) + )); + + assert!(matches!( + outcome_from_parts(ClientReportField::Filtered, "hydration-error"), + Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_))) + )); + } + + #[test] + fn test_from_outcome_type_client_discard() { + assert_eq!( + outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(), + Outcome::ClientDiscard("foo_reason".into()) + ); + } + + #[test] + fn test_from_outcome_type_rate_limited() { + assert!(matches!( + outcome_from_parts(ClientReportField::RateLimited, ""), + Ok(Outcome::RateLimited(None)) + )); + assert_eq!( + outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(), + Outcome::RateLimited(Some(ReasonCode::new("foo_reason"))) + ); + } +} diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index d42569a6c02..86b7267719b 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -24,6 +24,7 @@ pub use self::forward::*; pub use self::limits::*; pub mod check_ins; +pub mod client_reports; pub mod logs; pub mod sessions; pub mod spans; @@ -163,6 +164,14 @@ impl Output { } } + /// Creates an new output with neither main nor metrics. + pub fn empty() -> Self { + Self { + main: None, + metrics: None, + } + } + /// Maps an `Output` to `Output` by applying a function to [`Self::main`]. pub fn map(self, f: F) -> Output where diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 595f479c50a..46c80b47749 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -48,6 +48,7 @@ use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket}; use crate::metrics_extraction::transactions::ExtractedMetrics; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::processing::check_ins::CheckInsProcessor; +use crate::processing::client_reports::ClientReportsProcessor; use crate::processing::logs::LogsProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; @@ -1148,6 +1149,7 @@ struct Processing { spans: SpansProcessor, check_ins: CheckInsProcessor, sessions: SessionsProcessor, + client_report: ClientReportsProcessor, } impl EnvelopeProcessorService { @@ -1206,6 +1208,7 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] let rate_limiter = rate_limiter.map(Arc::new); + let outcome_aggregator = addrs.outcome_aggregator.clone(); let inner = InnerProcessor { pool, global_config, @@ -1234,7 +1237,8 @@ impl EnvelopeProcessorService { trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)), spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)), - sessions: SessionsProcessor::new(quota_limiter), + sessions: SessionsProcessor::new(Arc::clone("a_limiter)), + client_report: ClientReportsProcessor::new(outcome_aggregator), }, geoip_lookup, config, @@ -1729,32 +1733,6 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } - /// Processes user and client reports. - async fn process_client_reports( - &self, - managed_envelope: &mut TypedEnvelope, - ctx: processing::Context<'_>, - ) -> Result, ProcessingError> { - let mut extracted_metrics = ProcessingExtractedMetrics::new(); - - self.enforce_quotas( - managed_envelope, - Annotated::empty(), - &mut extracted_metrics, - ctx, - ) - .await?; - - report::process_client_reports( - managed_envelope, - ctx.config, - ctx.project_info, - self.inner.addrs.outcome_aggregator.clone(), - ); - - Ok(Some(extracted_metrics)) - } - /// Processes replays. async fn process_replays( &self, @@ -1945,7 +1923,14 @@ impl EnvelopeProcessorService { .await } ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx), - ProcessingGroup::ClientReport => run!(process_client_reports, ctx), + ProcessingGroup::ClientReport => { + self.process_with_processor( + &self.inner.processing.client_report, + managed_envelope, + ctx, + ) + .await + } ProcessingGroup::Replay => { run!(process_replays, ctx) } diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index c1b1145ae75..cb018f64291 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -1,195 +1,12 @@ //! Contains code related to validation and normalization of the user and client reports. -use std::collections::BTreeMap; use std::error::Error; -use chrono::{Duration as SignedDuration, Utc}; -use relay_common::time::UnixTimestamp; -use relay_config::Config; -use relay_event_normalization::ClockDriftProcessor; -use relay_event_schema::protocol::{ClientReport, UserReport}; -use relay_filter::FilterStatKey; -use relay_quotas::ReasonCode; -use relay_sampling::evaluation::MatchedRuleIds; -use relay_system::Addr; - -use crate::constants::DEFAULT_EVENT_RETENTION; +use relay_event_schema::protocol::UserReport; + use crate::envelope::{ContentType, ItemType}; use crate::managed::{ItemAction, TypedEnvelope}; -use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome}; -use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT}; -use crate::services::projects::project::ProjectInfo; - -/// Fields of client reports that map to specific [`Outcome`]s without content. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum ClientReportField { - /// The event has been filtered by an inbound data filter. - Filtered, - - /// The event has been filtered by a sampling rule. - FilteredSampling, - - /// The event has been rate limited. - RateLimited, - - /// The event has already been discarded on the client side. - ClientDiscard, -} - -/// Validates and extracts client reports. -/// -/// At the moment client reports are primarily used to transfer outcomes from -/// client SDKs. The outcomes are removed here and sent directly to the outcomes -/// system. -pub fn process_client_reports( - managed_envelope: &mut TypedEnvelope, - config: &Config, - project_info: &ProjectInfo, - outcome_aggregator: Addr, -) { - // if client outcomes are disabled we leave the client reports unprocessed - // and pass them on. - if !config.emit_outcomes().any() || !config.emit_client_outcomes() { - // if a processing relay has client outcomes disabled we drop them. - if config.processing_enabled() { - managed_envelope.retain_items(|item| match item.ty() { - ItemType::ClientReport => ItemAction::DropSilently, - _ => ItemAction::Keep, - }); - } - return; - } - - let mut timestamp = None; - let mut output_events = BTreeMap::new(); - let received = managed_envelope.received_at(); - - let clock_drift_processor = - ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received) - .at_least(MINIMUM_CLOCK_DRIFT); - - // we're going through all client reports but we're effectively just merging - // them into the first one. - managed_envelope.retain_items(|item| { - if item.ty() != &ItemType::ClientReport { - return ItemAction::Keep; - }; - match ClientReport::parse(&item.payload()) { - Ok(ClientReport { - timestamp: report_timestamp, - discarded_events, - rate_limited_events, - filtered_events, - filtered_sampling_events, - }) => { - // Glue all discarded events together and give them the appropriate outcome type - let input_events = - discarded_events - .into_iter() - .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) - .chain( - filtered_events.into_iter().map(|discarded_event| { - (ClientReportField::Filtered, discarded_event) - }), - ) - .chain(filtered_sampling_events.into_iter().map(|discarded_event| { - (ClientReportField::FilteredSampling, discarded_event) - })) - .chain(rate_limited_events.into_iter().map(|discarded_event| { - (ClientReportField::RateLimited, discarded_event) - })); - - for (outcome_type, discarded_event) in input_events { - if discarded_event.reason.len() > 200 { - relay_log::trace!("ignored client outcome with an overlong reason"); - continue; - } - *output_events - .entry(( - outcome_type, - discarded_event.reason, - discarded_event.category, - )) - .or_insert(0) += discarded_event.quantity; - } - if let Some(ts) = report_timestamp { - timestamp.get_or_insert(ts); - } - } - Err(err) => { - relay_log::trace!(error = &err as &dyn Error, "invalid client report received") - } - } - ItemAction::DropSilently - }); - - if output_events.is_empty() { - return; - } - - let timestamp = - timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); - - if clock_drift_processor.is_drifted() { - relay_log::trace!("applying clock drift correction to client report"); - clock_drift_processor.process_timestamp(timestamp); - } - - let retention_days = project_info - .config() - .event_retention - .unwrap_or(DEFAULT_EVENT_RETENTION); - let max_age = SignedDuration::days(retention_days.into()); - // also if we unable to parse the timestamp, we assume it's way too old here. - let in_past = timestamp - .as_datetime() - .map(|ts| (received - ts) > max_age) - .unwrap_or(true); - if in_past { - relay_log::trace!( - "skipping client outcomes older than {} days", - max_age.num_days() - ); - return; - } - - let max_future = SignedDuration::seconds(config.max_secs_in_future()); - // also if we unable to parse the timestamp, we assume it's way far in the future here. - let in_future = timestamp - .as_datetime() - .map(|ts| (ts - received) > max_future) - .unwrap_or(true); - if in_future { - relay_log::trace!( - "skipping client outcomes more than {}s in the future", - max_future.num_seconds() - ); - return; - } - - for ((outcome_type, reason, category), quantity) in output_events.into_iter() { - let outcome = match outcome_from_parts(outcome_type, &reason) { - Ok(outcome) => outcome, - Err(_) => { - relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); - continue; - } - }; - - outcome_aggregator.send(TrackOutcome { - // If we get to this point, the unwrap should not be used anymore, since we know by - // now that the timestamp can be parsed, but just incase we fallback to UTC current - // `DateTime`. - timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), - scoping: managed_envelope.scoping(), - outcome, - event_id: None, - remote_addr: None, // omitting the client address allows for better aggregation - category, - quantity, - }); - } -} +use crate::services::outcome::{DiscardReason, Outcome}; /// Validates and normalizes all user report items in the envelope. /// @@ -241,45 +58,23 @@ fn trim_whitespaces(data: &[u8]) -> &[u8] { &data[from..to + 1] } -/// Parse an outcome from an outcome ID and a reason string. -/// -/// Currently only used to reconstruct outcomes encoded in client reports. -fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result { - match field { - ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") { - Some(rule_ids) => MatchedRuleIds::parse(rule_ids) - .map(RuleCategories::from) - .map(Outcome::FilteredSampling) - .map_err(|_| ()), - None => Err(()), - }, - ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())), - ClientReportField::Filtered => Ok(Outcome::Filtered( - FilterStatKey::try_from(reason).map_err(|_| ())?, - )), - ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason { - "" => None, - other => Some(ReasonCode::new(other)), - })), - } -} - #[cfg(test)] mod tests { use relay_cogs::Token; use relay_config::Config; use relay_event_schema::protocol::EventId; + use relay_system::Addr; use crate::envelope::{Envelope, Item}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; - use crate::processing; - use crate::services::outcome::RuleCategory; + use crate::processing::{self}; use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit}; use crate::testutils::create_test_processor; use super::*; + // FIXME: Ask if moving the tests over is worth the changes to ProcessEnvelopeGrouped and Submit or if they should just stay here (hard to find). #[tokio::test] async fn test_client_report_removal() { relay_test::setup(); @@ -288,7 +83,6 @@ mod tests { let config = Config::from_json_value(serde_json::json!({ "outcomes": { "emit_outcomes": true, - "emit_client_outcomes": true } })) .unwrap(); @@ -338,69 +132,6 @@ mod tests { assert!(envelope.is_none()); } - #[tokio::test] - async fn test_client_report_forwarding() { - relay_test::setup(); - let outcome_aggregator = Addr::dummy(); - - let config = Config::from_json_value(serde_json::json!({ - "outcomes": { - "emit_outcomes": false, - // a relay need to emit outcomes at all to not process. - "emit_client_outcomes": true - } - })) - .unwrap(); - - let processor = create_test_processor(Default::default()).await; - - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" - .parse() - .unwrap(); - - let request_meta = RequestMeta::new(dsn); - let mut envelope = Envelope::from_request(None, request_meta); - - envelope.add_item({ - let mut item = Item::new(ItemType::ClientReport); - item.set_payload( - ContentType::Json, - r#" - { - "discarded_events": [ - ["queue_full", "error", 42] - ] - } - "#, - ); - item - }); - - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - let (group, envelope) = envelopes.pop().unwrap(); - let envelope = ManagedEnvelope::new(envelope, outcome_aggregator); - - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx: processing::Context { - config: &config, - ..processing::Context::for_test() - }, - }; - - let Ok(Some(Submit::Envelope(new_envelope))) = - processor.process(&mut Token::noop(), message).await - else { - panic!(); - }; - let item = new_envelope.envelope().items().next().unwrap(); - assert_eq!(item.ty(), &ItemType::ClientReport); - - new_envelope.accept(); // do not try to capture or emit outcomes - } - #[tokio::test] #[cfg(feature = "processing")] async fn test_client_report_removal_in_processing() { @@ -410,7 +141,6 @@ mod tests { let config = Config::from_json_value(serde_json::json!({ "outcomes": { "emit_outcomes": true, - "emit_client_outcomes": false, }, "processing": { "enabled": true, @@ -560,107 +290,6 @@ mod tests { assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event); } - #[test] - fn test_from_outcome_type_sampled() { - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err()); - - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err()); - - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err()); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"), - Err(()) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"), - Err(()) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"), - Err(()) - )); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::BoostEnvironments].into() - ))) - ); - - assert_eq!( - outcome_from_parts( - ClientReportField::FilteredSampling, - "Sampled:1001,1456,1567,3333,4444" - ), - Ok(Outcome::FilteredSampling(RuleCategories( - [ - RuleCategory::BoostEnvironments, - RuleCategory::BoostLowVolumeTransactions, - RuleCategory::BoostLatestReleases, - RuleCategory::Custom - ] - .into() - ))) - ); - } - - #[test] - fn test_from_outcome_type_filtered() { - assert!(matches!( - outcome_from_parts(ClientReportField::Filtered, "error-message"), - Ok(Outcome::Filtered(FilterStatKey::ErrorMessage)) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::Filtered, "hydration-error"), - Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_))) - )); - } - - #[test] - fn test_from_outcome_type_client_discard() { - assert_eq!( - outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(), - Outcome::ClientDiscard("foo_reason".into()) - ); - } - - #[test] - fn test_from_outcome_type_rate_limited() { - assert!(matches!( - outcome_from_parts(ClientReportField::RateLimited, ""), - Ok(Outcome::RateLimited(None)) - )); - assert_eq!( - outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(), - Outcome::RateLimited(Some(ReasonCode::new("foo_reason"))) - ); - } - #[test] fn test_trim_whitespaces() { assert_eq!(trim_whitespaces(b""), b""); diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 737ac8f911e..7cde5cefc3f 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -841,7 +841,6 @@ def test_outcome_to_client_report(relay, mini_sentry): { "outcomes": { "emit_outcomes": True, - "emit_client_outcomes": True, "batch_size": 1, "batch_interval": 1, }