Skip to content

Commit 1a745cd

Browse files
committed
Move eap switch to store
1 parent 71a0acf commit 1a745cd

File tree

6 files changed

+54
-66
lines changed

6 files changed

+54
-66
lines changed

relay-dynamic-config/src/feature.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,6 @@ pub enum Feature {
138138
/// Enable the experimental Trace Attachment pipeline in Relay.
139139
#[serde(rename = "projects:trace-attachment-processing")]
140140
TraceAttachmentProcessing,
141-
/// Enable EAP (Event Analytics Platform) double-write for user sessions.
142-
///
143-
/// When enabled, session data is sent both through the legacy metrics pipeline
144-
/// and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION.
145-
/// This enables migration to the new EAP-based user sessions storage.
146-
///
147-
/// Serialized as `organizations:user-sessions-eap`.
148-
#[serde(rename = "organizations:user-sessions-eap")]
149-
UserSessionsEap,
150141
/// Forward compatibility.
151142
#[doc(hidden)]
152143
#[serde(other)]

relay-dynamic-config/src/global.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,20 @@ pub struct Options {
176176
)]
177177
pub objectstore_attachments_sample_rate: f32,
178178

179+
/// Rollout rate for the EAP (Event Analytics Platform) double-write for user sessions.
180+
///
181+
/// When rolled out, session data is sent both through the legacy metrics pipeline
182+
/// and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION.
183+
///
184+
/// Rate needs to be between `0.0` and `1.0`.
185+
/// Rollout is determined deterministically per organization ID.
186+
#[serde(
187+
rename = "relay.sessions-eap.rollout-rate",
188+
deserialize_with = "default_on_error",
189+
skip_serializing_if = "is_default"
190+
)]
191+
pub sessions_eap_rollout_rate: f32,
192+
179193
/// All other unknown options.
180194
#[serde(flatten)]
181195
other: HashMap<String, Value>,

relay-server/src/processing/sessions/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ impl processing::Processor for SessionsProcessor {
104104
let sessions = self.limiter.enforce_quotas(sessions, ctx).await?;
105105

106106
let sessions = process::extract_metrics(sessions, ctx);
107-
108107
Ok(Output::metrics(sessions))
109108
}
110109
}

relay-server/src/services/processor.rs

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2449,18 +2449,18 @@ impl EnvelopeProcessorService {
24492449

24502450
/// Processes metric buckets and sends them to kafka.
24512451
///
2452-
/// Applies rate limiting and cardinality limiting, then submits to the store.
2453-
/// If `UserSessionsEap` is enabled, session metrics are also sent to EAP.
2452+
/// This function runs the following steps:
2453+
/// - cardinality limiting
2454+
/// - rate limiting
2455+
/// - submit to `StoreForwarder`
24542456
#[cfg(feature = "processing")]
24552457
async fn encode_metrics_processing(
24562458
&self,
24572459
message: FlushBuckets,
24582460
store_forwarder: &Addr<Store>,
24592461
) {
24602462
use crate::constants::DEFAULT_EVENT_RETENTION;
2461-
use crate::services::store::{StoreMetrics, StoreSessionMetricsEap};
2462-
use relay_dynamic_config::Feature;
2463-
use relay_metrics::MetricNamespace;
2463+
use crate::services::store::StoreMetrics;
24642464

24652465
for ProjectBuckets {
24662466
buckets,
@@ -2487,28 +2487,13 @@ impl EnvelopeProcessorService {
24872487
.event_retention
24882488
.unwrap_or(DEFAULT_EVENT_RETENTION);
24892489

2490-
// Always send all buckets to the legacy metrics path.
2490+
// The store forwarder takes care of bucket splitting internally, so we can submit the
2491+
// entire list of buckets. There is no batching needed here.
24912492
store_forwarder.send(StoreMetrics {
2492-
buckets: buckets.clone(),
2493+
buckets,
24932494
scoping,
24942495
retention,
24952496
});
2496-
2497-
// If EAP double-write is enabled, also send session metrics to EAP.
2498-
if project_info.config.features.has(Feature::UserSessionsEap) {
2499-
let session_buckets: Vec<_> = buckets
2500-
.into_iter()
2501-
.filter(|b| b.name.namespace() == MetricNamespace::Sessions)
2502-
.collect();
2503-
2504-
if !session_buckets.is_empty() {
2505-
store_forwarder.send(StoreSessionMetricsEap {
2506-
buckets: session_buckets,
2507-
scoping,
2508-
retention,
2509-
});
2510-
}
2511-
}
25122497
}
25132498
}
25142499

relay-server/src/services/store.rs

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,6 @@ pub struct StoreMetrics {
103103
pub retention: u16,
104104
}
105105

106-
/// Publishes session metric buckets to EAP via the snuba-items topic.
107-
///
108-
/// Used when the `UserSessionsEap` feature flag is enabled for double-write.
109-
#[derive(Clone, Debug)]
110-
pub struct StoreSessionMetricsEap {
111-
pub buckets: Vec<Bucket>,
112-
pub scoping: Scoping,
113-
pub retention: u16,
114-
}
115-
116106
/// Publishes a log item to the Sentry core application through Kafka.
117107
#[derive(Debug)]
118108
pub struct StoreTraceItem {
@@ -185,8 +175,6 @@ pub enum Store {
185175
Envelope(StoreEnvelope),
186176
/// Aggregated generic metrics.
187177
Metrics(StoreMetrics),
188-
/// Session metrics routed to EAP (Event Analytics Platform).
189-
SessionMetricsEap(StoreSessionMetricsEap),
190178
/// A singular [`TraceItem`].
191179
TraceItem(Managed<StoreTraceItem>),
192180
/// A singular Span.
@@ -201,7 +189,6 @@ impl Store {
201189
match self {
202190
Store::Envelope(_) => "envelope",
203191
Store::Metrics(_) => "metrics",
204-
Store::SessionMetricsEap(_) => "session_metrics_eap",
205192
Store::TraceItem(_) => "log",
206193
Store::Span(_) => "span",
207194
Store::ProfileChunk(_) => "profile_chunk",
@@ -227,14 +214,6 @@ impl FromMessage<StoreMetrics> for Store {
227214
}
228215
}
229216

230-
impl FromMessage<StoreSessionMetricsEap> for Store {
231-
type Response = NoResponse;
232-
233-
fn from_message(message: StoreSessionMetricsEap, _: ()) -> Self {
234-
Self::SessionMetricsEap(message)
235-
}
236-
}
237-
238217
impl FromMessage<Managed<StoreTraceItem>> for Store {
239218
type Response = NoResponse;
240219

@@ -294,7 +273,6 @@ impl StoreService {
294273
match message {
295274
Store::Envelope(message) => self.handle_store_envelope(message),
296275
Store::Metrics(message) => self.handle_store_metrics(message),
297-
Store::SessionMetricsEap(message) => self.handle_store_session_metrics_eap(message),
298276
Store::TraceItem(message) => self.handle_store_trace_item(message),
299277
Store::Span(message) => self.handle_store_span(message),
300278
Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
@@ -543,9 +521,22 @@ impl StoreService {
543521
let global_config = self.global_config.current();
544522
let mut encoder = BucketEncoder::new(&global_config);
545523

524+
// Check if this organization is rolled out for sessions EAP double-write.
525+
let sessions_eap_rollout_rate = global_config.options.sessions_eap_rollout_rate;
526+
let emit_sessions_to_eap =
527+
utils::is_rolled_out(scoping.organization_id.value(), sessions_eap_rollout_rate)
528+
.is_keep();
529+
546530
let now = UnixTimestamp::now();
547531
let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
548532

533+
// Collect session buckets for EAP if rolled out.
534+
let mut session_buckets = if emit_sessions_to_eap {
535+
Vec::new()
536+
} else {
537+
Vec::with_capacity(0)
538+
};
539+
549540
for mut bucket in buckets {
550541
let namespace = encoder.prepare(&mut bucket);
551542

@@ -557,6 +548,11 @@ impl StoreService {
557548
*max = (*max).max(delay);
558549
}
559550

551+
// Collect session buckets for EAP double-write.
552+
if emit_sessions_to_eap && namespace == MetricNamespace::Sessions {
553+
session_buckets.push(bucket.clone());
554+
}
555+
560556
// Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
561557
// each bucket separately, we only need to split buckets that exceed the size, but not
562558
// batches.
@@ -588,6 +584,11 @@ impl StoreService {
588584
}
589585
}
590586

587+
// Also emit session buckets to EAP if rolled out.
588+
if !session_buckets.is_empty() {
589+
self.emit_session_metrics_to_eap(&session_buckets, scoping, retention);
590+
}
591+
591592
if let Some(error) = error {
592593
relay_log::error!(
593594
error = &error as &dyn std::error::Error,
@@ -614,8 +615,11 @@ impl StoreService {
614615
}
615616
}
616617

617-
/// Sends session metric buckets to EAP (snuba-items topic) as TraceItems.
618-
fn handle_store_session_metrics_eap(&self, message: StoreSessionMetricsEap) {
618+
/// Emits session metric buckets to EAP (snuba-items topic) as TraceItems.
619+
///
620+
/// This is called when the organization is rolled out for sessions EAP double-write
621+
/// via `sessions_eap_rollout_rate`.
622+
fn emit_session_metrics_to_eap(&self, buckets: &[Bucket], scoping: Scoping, retention: u16) {
619623
use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value};
620624
use std::collections::HashMap;
621625

@@ -624,11 +628,6 @@ impl StoreService {
624628
0x90, 0xa1,
625629
]);
626630

627-
let StoreSessionMetricsEap {
628-
buckets,
629-
scoping,
630-
retention,
631-
} = message;
632631
let now = UnixTimestamp::now();
633632
let mut error = None;
634633
let mut row_count: u64 = 0;
@@ -696,8 +695,8 @@ impl StoreService {
696695
trace_id: uuid.to_string(),
697696
item_id: uuid.as_bytes()[..16].into(),
698697
item_type: i32::from(TraceItemType::UserSession),
699-
timestamp: timestamp.clone(),
700-
received: received.clone(),
698+
timestamp,
699+
received,
701700
retention_days: retention.into(),
702701
downsampled_retention_days: retention.into(),
703702
attributes,

relay-server/src/statsd.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -826,8 +826,8 @@ pub enum RelayCounters {
826826
SpanV2Produced,
827827
/// Number of session metric buckets produced to the EAP snuba-items topic.
828828
///
829-
/// This metric is only emitted when the `UserSessionsEap` feature flag is enabled,
830-
/// routing aggregated session metrics to EAP (Event Analytics Platform) instead of
829+
/// This metric is only emitted when the organization is rolled out via `sessions_eap_rollout_rate`,
830+
/// routing aggregated session metrics to EAP (Event Analytics Platform) in addition to
831831
/// the legacy ingest-metrics topic.
832832
///
833833
/// This metric is tagged with:

0 commit comments

Comments
 (0)