Skip to content
9 changes: 9 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ pub enum Feature {
/// Enable the experimental Trace Attachment pipeline in Relay.
#[serde(rename = "projects:trace-attachment-processing")]
TraceAttachmentProcessing,
/// Enable EAP (Event Analytics Platform) double-write for user sessions.
///
/// When enabled, session data is sent both through the legacy metrics pipeline
/// and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION.
/// This enables migration to the new EAP-based user sessions storage.
///
/// Serialized as `organizations:user-sessions-eap`.
#[serde(rename = "organizations:user-sessions-eap")]
UserSessionsEap,
/// Forward compatibility.
#[doc(hidden)]
#[serde(other)]
Expand Down
10 changes: 9 additions & 1 deletion relay-dynamic-config/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ pub struct RetentionsConfig {
/// Retention settings for attachments.
#[serde(skip_serializing_if = "Option::is_none")]
pub trace_attachment: Option<RetentionConfig>,
/// Retention settings for user sessions (EAP).
#[serde(skip_serializing_if = "Option::is_none")]
pub session: Option<RetentionConfig>,
}

impl RetentionsConfig {
Expand All @@ -271,9 +274,14 @@ impl RetentionsConfig {
span,
trace_metric,
trace_attachment,
session,
} = self;

log.is_none() && span.is_none() && trace_metric.is_none() && trace_attachment.is_none()
log.is_none()
&& span.is_none()
&& trace_metric.is_none()
&& trace_attachment.is_none()
&& session.is_none()
}
}

Expand Down
126 changes: 113 additions & 13 deletions relay-server/src/processing/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities
use crate::processing::sessions::process::Expansion;
use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter};
use crate::services::outcome::Outcome;
#[cfg(feature = "processing")]
use crate::statsd::RelayCounters;

mod filter;
mod process;
#[cfg(feature = "processing")]
mod store;

type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -91,7 +95,9 @@ impl processing::Processor for SessionsProcessor {
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
let mut sessions = match process::expand(sessions, ctx) {
Expansion::Continue(sessions) => sessions,
Expansion::Forward(sessions) => return Ok(Output::just(SessionsOutput(sessions))),
Expansion::Forward(sessions) => {
return Ok(Output::just(SessionsOutput::Forward(sessions)));
}
};

// We can apply filters before normalization here, as our filters currently do not depend
Expand All @@ -103,31 +109,125 @@ impl processing::Processor for SessionsProcessor {

let sessions = self.limiter.enforce_quotas(sessions, ctx).await?;

let sessions = process::extract(sessions, ctx);
Ok(Output::metrics(sessions))
// Check if EAP user sessions double-write is enabled.
// This feature sends session data to both the legacy metrics pipeline
// and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION.
let eap_enabled = ctx
.project_info
.config
.features
.has(relay_dynamic_config::Feature::UserSessionsEap);

let (metrics, eap_sessions) = process::extract_with_eap(sessions, ctx, eap_enabled);

if let Some(eap_sessions) = eap_sessions {
// Return both the EAP sessions for storage and the extracted metrics.
Ok(Output {
main: Some(SessionsOutput::Store(eap_sessions)),
metrics: Some(metrics),
})
} else {
// Legacy path: only return metrics.
Ok(Output::metrics(metrics))
}
}
}

/// Output produced by the [`SessionsProcessor`].
#[derive(Debug)]
pub struct SessionsOutput(Managed<SerializedSessions>);
pub enum SessionsOutput {
/// Sessions that should be forwarded (non-processing relay).
Forward(Managed<SerializedSessions>),
/// Sessions that should be stored to EAP (processing relay with feature enabled).
Store(Managed<ExpandedSessions>),
}

impl Forward for SessionsOutput {
fn serialize_envelope(
self,
_: processing::ForwardContext<'_>,
) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
Ok(self.0.map(|sessions, _| sessions.serialize_envelope()))
match self {
Self::Forward(sessions) => {
Ok(sessions.map(|sessions, _| sessions.serialize_envelope()))
}
Self::Store(sessions) => {
// EAP sessions should be stored, not serialized to envelope.
Err(sessions
.internal_error("EAP sessions should be stored, not serialized to envelope"))
}
}
}

#[cfg(feature = "processing")]
fn forward_store(
self,
_: processing::forward::StoreHandle<'_>,
_: processing::ForwardContext<'_>,
s: processing::forward::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let SessionsOutput(sessions) = self;
Err(sessions.internal_error("sessions should always be extracted into metrics"))
match self {
Self::Forward(sessions) => {
// Non-processing relay path - sessions should have been extracted to metrics.
Err(sessions.internal_error("sessions should always be extracted into metrics"))
}
Self::Store(sessions) => {
// EAP double-write path: convert expanded sessions to TraceItems and store.
let store_ctx = store::Context {
received_at: sessions.received_at(),
scoping: sessions.scoping(),
retention: ctx.retention(|r| r.session.as_ref()),
};

// Split sessions into updates and aggregates, keeping track of the aggregates
// for later processing.
let (updates_managed, aggregates) =
sessions.split_once(|s, _| (s.updates, s.aggregates));

// Convert and store each session update.
for session in updates_managed.split(|updates| updates) {
let item = session.try_map(|session, _| {
Ok::<_, std::convert::Infallible>(store::convert_session_update(
&session, &store_ctx,
))
});
if let Ok(item) = item {
s.store(item);
relay_statsd::metric!(
counter(RelayCounters::SessionsEapProduced) += 1,
session_type = "update"
);
}
}

// Convert and store each session aggregate.
// Aggregates are expanded into individual session rows to unify the format.
for aggregate_batch in aggregates.split(|aggs| aggs) {
let release = aggregate_batch.attributes.release.clone();
let environment = aggregate_batch.attributes.environment.clone();

for aggregate in aggregate_batch.split(|batch| batch.aggregates) {
// Convert aggregate to multiple individual session items
let items = store::convert_session_aggregate(
&aggregate,
&release,
environment.as_deref(),
&store_ctx,
);

for item in items {
let managed_item = aggregate.wrap(item);
s.store(managed_item);
relay_statsd::metric!(
counter(RelayCounters::SessionsEapProduced) += 1,
session_type = "aggregate"
);
}
}
}

Ok(())
}
}
}
}

Expand Down Expand Up @@ -163,15 +263,15 @@ impl Counted for SerializedSessions {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ExpandedSessions {
/// Original envelope headers.
headers: EnvelopeHeaders,
pub(crate) headers: EnvelopeHeaders,

/// A list of parsed session updates.
updates: Vec<SessionUpdate>,
pub(crate) updates: Vec<SessionUpdate>,
/// A list of parsed session aggregates.
aggregates: Vec<SessionAggregates>,
pub(crate) aggregates: Vec<SessionAggregates>,
}

impl Counted for ExpandedSessions {
Expand Down
32 changes: 29 additions & 3 deletions relay-server/src/processing/sessions/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,38 @@ fn normalize_attributes(attrs: &mut SessionAttributes, ctx: &NormalizeContext<'_
Ok(())
}

pub fn extract(sessions: Managed<ExpandedSessions>, ctx: Context<'_>) -> Managed<ExtractedMetrics> {
/// Extracts session metrics and optionally returns the expanded sessions for EAP storage.
///
/// When `eap_enabled` is true, this function returns both the extracted metrics (for the legacy
/// pipeline) and the expanded sessions (for double-write to the snuba-items topic as
/// TRACE_ITEM_TYPE_USER_SESSION).
///
/// This enables a gradual migration from the legacy session metrics to the new EAP-based
/// user sessions storage, with both paths running in parallel during the migration period.
pub fn extract_with_eap(
sessions: Managed<ExpandedSessions>,
ctx: Context<'_>,
eap_enabled: bool,
) -> (Managed<ExtractedMetrics>, Option<Managed<ExpandedSessions>>) {
let should_extract_abnormal_mechanism = ctx
.project_info
.config
.session_metrics
.should_extract_abnormal_mechanism();

sessions.map(|sessions, records| {
// If EAP is enabled, we need to clone the sessions before consuming them for metrics.
// Use `wrap` to create a new Managed with the same metadata but cloned data.
let eap_sessions = if eap_enabled {
Some(sessions.wrap(ExpandedSessions {
headers: sessions.headers.clone(),
updates: sessions.updates.clone(),
aggregates: sessions.aggregates.clone(),
}))
} else {
None
};

let metrics = sessions.map(|sessions, records| {
let mut metrics = Vec::new();
let meta = sessions.headers.meta();

Expand Down Expand Up @@ -253,5 +277,7 @@ pub fn extract(sessions: Managed<ExpandedSessions>, ctx: Context<'_>) -> Managed
project_metrics: metrics,
sampling_metrics: Vec::new(),
}
})
});

(metrics, eap_sessions)
}
Loading
Loading