Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions relay-server/src/processing/client_reports/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use relay_system::Addr;

use crate::envelope::{EnvelopeHeaders, Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, Quantities, Rejected};
use crate::processing::{self, Context, Nothing, Output};
use crate::services::outcome::TrackOutcome;

mod process;

// TODO: Not sure there is a cleaner way to do this.
#[derive(Debug, thiserror::Error)]
pub enum Error {}

/// A processor for Client-Reports.
pub struct ClientReportsProcessor {
aggregator: Addr<TrackOutcome>,
}

impl ClientReportsProcessor {
/// Creates a new [`Self`].
pub fn new(aggregator: Addr<TrackOutcome>) -> Self {
Self { aggregator }
}
}

impl processing::Processor for ClientReportsProcessor {
type UnitOfWork = SerializedClientReports;
type Output = Nothing;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
let headers = envelope.envelope().headers().clone();

let client_reports = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport))
.into_vec();

let work = SerializedClientReports {
headers,
client_reports,
};
Some(Managed::from_envelope(envelope, work))
}

async fn process(
&self,
mut client_reports: Managed<Self::UnitOfWork>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
process::process_client_reports(
&mut client_reports,
ctx.config,
ctx.project_info,
&self.aggregator,
);

Ok(Output::empty())
}
}

/// Client-Reports in their serialized state, as transported in an envelope.
#[derive(Debug)]
pub struct SerializedClientReports {
/// Original envelope headers.
headers: EnvelopeHeaders,

/// A list of client-reports waiting to be processed.
///
/// All items contained here must be client-reports.
client_reports: Vec<Item>,
}

impl Counted for SerializedClientReports {
fn quantities(&self) -> Quantities {
smallvec::smallvec![]
}
}
307 changes: 307 additions & 0 deletions relay-server/src/processing/client_reports/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
use std::collections::BTreeMap;
use std::error::Error;

use chrono::{Duration as SignedDuration, Utc};
use relay_common::time::UnixTimestamp;
use relay_config::Config;
use relay_event_normalization::ClockDriftProcessor;
use relay_event_schema::protocol::ClientReport;
use relay_filter::FilterStatKey;
use relay_quotas::ReasonCode;
use relay_sampling::evaluation::MatchedRuleIds;
use relay_system::Addr;

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::managed::Managed;
use crate::processing::client_reports::SerializedClientReports;
use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome};
use crate::services::processor::MINIMUM_CLOCK_DRIFT;
use crate::services::projects::project::ProjectInfo;

/// Fields of client reports that map to specific [`Outcome`]s without content.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum ClientReportField {
/// The event has been filtered by an inbound data filter.
Filtered,

/// The event has been filtered by a sampling rule.
FilteredSampling,

/// The event has been rate limited.
RateLimited,

/// The event has already been discarded on the client side.
ClientDiscard,
}

/// Validates and extracts client reports.
///
/// At the moment client reports are primarily used to transfer outcomes from
/// client SDKs. The outcomes are removed here and sent directly to the outcomes
/// system.
pub fn process_client_reports(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is taken almost 1-1 from the old processing so if we think this is better broken up or rewritten to make it more elegant can still do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do that, left an explaining comment on why here: #5338 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, rewrote the logic now to split it up ruffly into the individual parts that you mentioned. Not sure if the code is too cute now though.

client_reports: &mut Managed<SerializedClientReports>,
config: &Config,
project_info: &ProjectInfo,
outcome_aggregator: &Addr<TrackOutcome>,
) {
if (!config.emit_outcomes().any() || !config.emit_client_outcomes())
&& config.processing_enabled()
{
// if a processing relay has client outcomes disabled we drop them without processing.
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return an error here, this only works because we do not log any outcomes for client reports, but dropping data should be explicit and go through Rejected.

}
Copy link
Member Author

@tobias-wilfert tobias-wilfert Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be:

// if client outcomes are disabled we leave the client reports unprocessed
// and pass them on.
if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
    // if a processing relay has client outcomes disabled we drop them.
    if config.processing_enabled() {
        managed_envelope.retain_items(|item| match item.ty() {
            ItemType::ClientReport => ItemAction::DropSilently,
            _ => ItemAction::Keep,
        });
    }
    return;
}

Not sure however we want to still drop them when processing is enabled or if we just want to get rid of this all together.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a redundant check, if outcomes are disabled, the aggregator should drop them alltogether 🤔

@jjbayer wdyt, remove this either now or in a follow up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, emit_client_outcomes and emit_outcomes are different config flags so it's not entirely redundant. But I would be fine with making a breaking change and remove emit_client_outcomes from the config altogether.


let mut timestamp = None;
let mut output_events = BTreeMap::new();
let received = client_reports.received_at();

let clock_drift_processor =
ClockDriftProcessor::new(client_reports.headers.sent_at(), received)
.at_least(MINIMUM_CLOCK_DRIFT);

for item in &client_reports.client_reports {
match ClientReport::parse(&item.payload()) {
Ok(ClientReport {
timestamp: report_timestamp,
discarded_events,
rate_limited_events,
filtered_events,
filtered_sampling_events,
}) => {
// Glue all discarded events together and give them the appropriate outcome type
let input_events =
discarded_events
.into_iter()
.map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
.chain(
filtered_events.into_iter().map(|discarded_event| {
(ClientReportField::Filtered, discarded_event)
}),
)
.chain(filtered_sampling_events.into_iter().map(|discarded_event| {
(ClientReportField::FilteredSampling, discarded_event)
}))
.chain(rate_limited_events.into_iter().map(|discarded_event| {
(ClientReportField::RateLimited, discarded_event)
}));

for (outcome_type, discarded_event) in input_events {
if discarded_event.reason.len() > 200 {
relay_log::trace!("ignored client outcome with an overlong reason");
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, this bypasses the accounting logic of Managed, it just happens to work because there are no outcomes for client reports, but the processing code should work independent of this.

With outcomes for client reports and in a debug build this would panic due to accounting errors.

If you follow the implementation of e.g. the logs processor, you can do the processing in multiple phases:

  • managed.map() -> here you expand the client reports
  • managed.retain() -> here you validate/normalize the client reports
  • managed.accept() -> here you send the client reports to the aggregator

For readability reasons you can also split these 3 phases into 3 different functions.

If you want we can pair next week on making these changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, if we make the effort of porting the code we might as well use the new bookkeeping mechanism.

}
*output_events
.entry((
outcome_type,
discarded_event.reason,
discarded_event.category,
))
.or_insert(0) += discarded_event.quantity;
}
if let Some(ts) = report_timestamp {
timestamp.get_or_insert(ts);
}
}
Err(err) => {
relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
}
}
}

if output_events.is_empty() {
return;
}

let timestamp =
timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));

if clock_drift_processor.is_drifted() {
relay_log::trace!("applying clock drift correction to client report");
clock_drift_processor.process_timestamp(timestamp);
}

let retention_days = project_info
.config()
.event_retention
.unwrap_or(DEFAULT_EVENT_RETENTION);
let max_age = SignedDuration::days(retention_days.into());
// also if we unable to parse the timestamp, we assume it's way too old here.
let in_past = timestamp
.as_datetime()
.map(|ts| (received - ts) > max_age)
.unwrap_or(true);
if in_past {
relay_log::trace!(
"skipping client outcomes older than {} days",
max_age.num_days()
);
return;
}

let max_future = SignedDuration::seconds(config.max_secs_in_future());
// also if we unable to parse the timestamp, we assume it's way far in the future here.
let in_future = timestamp
.as_datetime()
.map(|ts| (ts - received) > max_future)
.unwrap_or(true);
if in_future {
relay_log::trace!(
"skipping client outcomes more than {}s in the future",
max_future.num_seconds()
);
return;
}

for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
let Ok(outcome) = outcome_from_parts(outcome_type, &reason) else {
relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
continue;
};

outcome_aggregator.send(TrackOutcome {
// If we get to this point, the unwrap should not be used anymore, since we know by
// now that the timestamp can be parsed, but just incase we fallback to UTC current
// `DateTime`.
timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
scoping: client_reports.scoping(),
outcome,
event_id: None,
remote_addr: None, // omitting the client address allows for better aggregation
category,
quantity,
});
}
}

/// Parse an outcome from an outcome ID and a reason string.
///
/// Currently only used to reconstruct outcomes encoded in client reports.
fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
match field {
ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
.map(RuleCategories::from)
.map(Outcome::FilteredSampling)
.map_err(|_| ()),
None => Err(()),
},
ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
ClientReportField::Filtered => Ok(Outcome::Filtered(
FilterStatKey::try_from(reason).map_err(|_| ())?,
)),
ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
"" => None,
other => Some(ReasonCode::new(other)),
})),
}
}

#[cfg(test)]
mod tests {

use crate::services::outcome::RuleCategory;

use super::*;

#[test]
fn test_from_outcome_type_sampled() {
assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err());

assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err());

assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err());

assert!(matches!(
outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"),
Err(())
));

assert!(matches!(
outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"),
Err(())
));

assert!(matches!(
outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"),
Err(())
));

assert_eq!(
outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"),
Ok(Outcome::FilteredSampling(RuleCategories(
[RuleCategory::Other].into()
)))
);

assert_eq!(
outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
Ok(Outcome::FilteredSampling(RuleCategories(
[RuleCategory::Other].into()
)))
);

assert_eq!(
outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
Ok(Outcome::FilteredSampling(RuleCategories(
[RuleCategory::Other].into()
)))
);

assert_eq!(
outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"),
Ok(Outcome::FilteredSampling(RuleCategories(
[RuleCategory::BoostEnvironments].into()
)))
);

assert_eq!(
outcome_from_parts(
ClientReportField::FilteredSampling,
"Sampled:1001,1456,1567,3333,4444"
),
Ok(Outcome::FilteredSampling(RuleCategories(
[
RuleCategory::BoostEnvironments,
RuleCategory::BoostLowVolumeTransactions,
RuleCategory::BoostLatestReleases,
RuleCategory::Custom
]
.into()
)))
);
}

#[test]
fn test_from_outcome_type_filtered() {
assert!(matches!(
outcome_from_parts(ClientReportField::Filtered, "error-message"),
Ok(Outcome::Filtered(FilterStatKey::ErrorMessage))
));

assert!(matches!(
outcome_from_parts(ClientReportField::Filtered, "hydration-error"),
Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_)))
));
}

#[test]
fn test_from_outcome_type_client_discard() {
assert_eq!(
outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(),
Outcome::ClientDiscard("foo_reason".into())
);
}

#[test]
fn test_from_outcome_type_rate_limited() {
assert!(matches!(
outcome_from_parts(ClientReportField::RateLimited, ""),
Ok(Outcome::RateLimited(None))
));
assert_eq!(
outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(),
Outcome::RateLimited(Some(ReasonCode::new("foo_reason")))
);
}
}
Loading
Loading