Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,8 +1366,6 @@ pub struct Outcomes {
/// Processing relays always emit outcomes (for backwards compatibility).
/// Can take the following values: false, "as_client_reports", true
pub emit_outcomes: EmitOutcomes,
/// Controls wheather client reported outcomes should be emitted.
pub emit_client_outcomes: bool,
/// The maximum number of outcomes that are batched before being sent
/// via http to the upstream (only applies to non processing relays).
pub batch_size: usize,
Expand All @@ -1385,7 +1383,6 @@ impl Default for Outcomes {
fn default() -> Self {
Outcomes {
emit_outcomes: EmitOutcomes::AsClientReports,
emit_client_outcomes: true,
batch_size: 1000,
batch_interval: 500,
source: None,
Expand Down Expand Up @@ -2078,19 +2075,6 @@ impl Config {
self.values.outcomes.emit_outcomes
}

/// Returns whether this Relay should emit client outcomes
///
/// Relays that do not emit client outcomes will forward client recieved outcomes
/// directly to the next relay in the chain as client report envelope. This is only done
/// if this relay emits outcomes at all. A relay that will not emit outcomes
/// will forward the envelope unchanged.
///
/// This flag can be explicitly disabled on processing relays as well to prevent the
/// emitting of client outcomes to the kafka topic.
pub fn emit_client_outcomes(&self) -> bool {
self.values.outcomes.emit_client_outcomes
}

/// Returns the maximum number of outcomes that are batched before being sent
pub fn outcome_batch_size(&self) -> usize {
self.values.outcomes.batch_size
Expand Down
166 changes: 166 additions & 0 deletions relay-server/src/processing/client_reports/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use relay_metrics::UnixTimestamp;
use relay_quotas::DataCategory;
use relay_system::Addr;

use crate::envelope::{EnvelopeHeaders, Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::processing::{self, Context, Nothing, Output};
use crate::services::outcome::{Outcome, TrackOutcome};

mod process;

#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Client report parsing failed.
#[error("invalid client report")]
InvalidClientReport,
/// Processing relay has client outcomes disabled.
#[error("client outcomes disabled")]
OutcomesDisabled,
/// Client report timestamp is too old.
#[error("client report timestamp too old")]
TimestampTooOld,
/// Client report timestamp is too far in the future.
#[error("client report timestamp in future")]
TimestampInFuture,
/// Client report contains an invalid outcome combination.
#[error("invalid outcome combination")]
InvalidOutcome,
}

impl OutcomeError for Error {
type Error = Self;

fn consume(self) -> (Option<Outcome>, Self::Error) {
// Currently/historically client reports do not emit outcomes.
(None, self)
}
}

/// A processor for Client-Reports.
pub struct ClientReportsProcessor {
aggregator: Addr<TrackOutcome>,
}

impl ClientReportsProcessor {
/// Creates a new [`Self`].
pub fn new(aggregator: Addr<TrackOutcome>) -> Self {
Self { aggregator }
}
}

impl processing::Processor for ClientReportsProcessor {
type UnitOfWork = SerializedClientReports;
type Output = Nothing;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
let headers = envelope.envelope().headers().clone();

let client_reports = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport))
.into_vec();

let work = SerializedClientReports {
headers,
client_reports,
};
Some(Managed::from_envelope(envelope, work))
}

async fn process(
&self,
client_reports: Managed<Self::UnitOfWork>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
if !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() {
// if a processing relay has client outcomes disabled we drop them without processing.
return Err(client_reports.reject_err(Error::OutcomesDisabled));
}

Comment on lines +80 to +84
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The condition !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() is logically unreachable, preventing OutcomesDisabled error.
Severity: HIGH | Confidence: High

🔍 Detailed Analysis

The condition !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() at relay-server/src/processing/client_reports/mod.rs:80-82 is logically unreachable. When ctx.config.processing_enabled() is true, emit_outcomes().any() is always true, making the first part false. When ctx.config.processing_enabled() is false, the second part is false. Consequently, the OutcomesDisabled error path is never taken, causing client reports to always be processed, ignoring the emit_outcomes configuration. This is a functional regression.

💡 Suggested Fix

Correct the condition at relay-server/src/processing/client_reports/mod.rs:80-82 to reflect the intended logic, likely by negating ctx.config.processing_enabled() or reverting to the original condition (!config.emit_outcomes().any() || !config.emit_client_outcomes()) && !config.processing_enabled().

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: relay-server/src/processing/client_reports/mod.rs#L80-L84

Potential issue: The condition `!ctx.config.emit_outcomes().any() &&
ctx.config.processing_enabled()` at
`relay-server/src/processing/client_reports/mod.rs:80-82` is logically unreachable. When
`ctx.config.processing_enabled()` is true, `emit_outcomes().any()` is always true,
making the first part false. When `ctx.config.processing_enabled()` is false, the second
part is false. Consequently, the `OutcomesDisabled` error path is never taken, causing
client reports to always be processed, ignoring the `emit_outcomes` configuration. This
is a functional regression.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 3173889

let client_reports = process::expand(client_reports);

// FIXME: This guard is taken over from the old code not sure if we still need this.
if !client_reports.output_events.is_empty() {
let client_reports = process::validate(client_reports, ctx.config, ctx.project_info)?;
process::emit(client_reports, &self.aggregator);
}

Ok(Output::empty())
}
}

/// Client-Reports in their serialized state, as transported in an envelope.
#[derive(Debug)]
pub struct SerializedClientReports {
/// Original envelope headers.
headers: EnvelopeHeaders,
/// A list of client-reports waiting to be processed.
///
/// All items contained here must be client-reports.
client_reports: Vec<Item>,
}

impl Counted for SerializedClientReports {
fn quantities(&self) -> Quantities {
smallvec::smallvec![]
}
}

/// Client reports which have been parsed and aggregated from their serialized state.
pub struct ExpandedClientReports {
/// Original envelope headers.
headers: EnvelopeHeaders,
/// The timestamp of when the reports were created.
timestamp: Option<UnixTimestamp>,
/// Aggregated outcome events from all parsed client reports.
output_events: Vec<OutcomeEvent>,
}

impl Counted for ExpandedClientReports {
fn quantities(&self) -> crate::managed::Quantities {
smallvec::smallvec![]
}
}

/// A aggregated single outcome event.
pub struct OutcomeEvent {
outcome_type: ClientReportField,
reason: String,
category: DataCategory,
quantity: u32,
}

impl Counted for OutcomeEvent {
fn quantities(&self) -> crate::managed::Quantities {
smallvec::smallvec![]
}
}

/// Client reports which have been validated and converted to trackable outcomes.
pub struct ValidatedClientReports {
outcomes: Vec<TrackOutcome>,
}

impl Counted for ValidatedClientReports {
fn quantities(&self) -> crate::managed::Quantities {
smallvec::smallvec![]
}
}

/// Fields of client reports that map to specific [`Outcome`]s without content.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum ClientReportField {
/// The event has been filtered by an inbound data filter.
Filtered,
/// The event has been filtered by a sampling rule.
FilteredSampling,
/// The event has been rate limited.
RateLimited,
/// The event has already been discarded on the client side.
ClientDiscard,
}
Loading
Loading