@@ -9,9 +9,13 @@ use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities
99use crate :: processing:: sessions:: process:: Expansion ;
1010use crate :: processing:: { self , Context , CountRateLimited , Forward , Output , QuotaRateLimiter } ;
1111use crate :: services:: outcome:: Outcome ;
12+ #[ cfg( feature = "processing" ) ]
13+ use crate :: statsd:: RelayCounters ;
1214
1315mod filter;
1416mod process;
17+ #[ cfg( feature = "processing" ) ]
18+ mod store;
1519
1620type Result < T , E = Error > = std:: result:: Result < T , E > ;
1721
@@ -91,7 +95,9 @@ impl processing::Processor for SessionsProcessor {
9195 ) -> Result < Output < Self :: Output > , Rejected < Self :: Error > > {
9296 let mut sessions = match process:: expand ( sessions, ctx) {
9397 Expansion :: Continue ( sessions) => sessions,
94- Expansion :: Forward ( sessions) => return Ok ( Output :: just ( SessionsOutput ( sessions) ) ) ,
98+ Expansion :: Forward ( sessions) => {
99+ return Ok ( Output :: just ( SessionsOutput :: Forward ( sessions) ) ) ;
100+ }
95101 } ;
96102
97103 // We can apply filters before normalization here, as our filters currently do not depend
@@ -103,31 +109,125 @@ impl processing::Processor for SessionsProcessor {
103109
104110 let sessions = self . limiter . enforce_quotas ( sessions, ctx) . await ?;
105111
106- let sessions = process:: extract ( sessions, ctx) ;
107- Ok ( Output :: metrics ( sessions) )
112+ // Check if EAP user sessions double-write is enabled.
113+ // This feature sends session data to both the legacy metrics pipeline
114+ // and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION.
115+ let eap_enabled = ctx
116+ . project_info
117+ . config
118+ . features
119+ . has ( relay_dynamic_config:: Feature :: UserSessionsEap ) ;
120+
121+ let ( metrics, eap_sessions) = process:: extract_with_eap ( sessions, ctx, eap_enabled) ;
122+
123+ if let Some ( eap_sessions) = eap_sessions {
124+ // Return both the EAP sessions for storage and the extracted metrics.
125+ Ok ( Output {
126+ main : Some ( SessionsOutput :: Store ( eap_sessions) ) ,
127+ metrics : Some ( metrics) ,
128+ } )
129+ } else {
130+ // Legacy path: only return metrics.
131+ Ok ( Output :: metrics ( metrics) )
132+ }
108133 }
109134}
110135
111136/// Output produced by the [`SessionsProcessor`].
112137#[ derive( Debug ) ]
113- pub struct SessionsOutput ( Managed < SerializedSessions > ) ;
138+ pub enum SessionsOutput {
139+ /// Sessions that should be forwarded (non-processing relay).
140+ Forward ( Managed < SerializedSessions > ) ,
141+ /// Sessions that should be stored to EAP (processing relay with feature enabled).
142+ Store ( Managed < ExpandedSessions > ) ,
143+ }
114144
115145impl Forward for SessionsOutput {
116146 fn serialize_envelope (
117147 self ,
118148 _: processing:: ForwardContext < ' _ > ,
119149 ) -> Result < Managed < Box < Envelope > > , Rejected < ( ) > > {
120- Ok ( self . 0 . map ( |sessions, _| sessions. serialize_envelope ( ) ) )
150+ match self {
151+ Self :: Forward ( sessions) => {
152+ Ok ( sessions. map ( |sessions, _| sessions. serialize_envelope ( ) ) )
153+ }
154+ Self :: Store ( sessions) => {
155+ // EAP sessions should be stored, not serialized to envelope.
156+ Err ( sessions
157+ . internal_error ( "EAP sessions should be stored, not serialized to envelope" ) )
158+ }
159+ }
121160 }
122161
123162 #[ cfg( feature = "processing" ) ]
124163 fn forward_store (
125164 self ,
126- _ : processing:: forward:: StoreHandle < ' _ > ,
127- _ : processing:: ForwardContext < ' _ > ,
165+ s : processing:: forward:: StoreHandle < ' _ > ,
166+ ctx : processing:: ForwardContext < ' _ > ,
128167 ) -> Result < ( ) , Rejected < ( ) > > {
129- let SessionsOutput ( sessions) = self ;
130- Err ( sessions. internal_error ( "sessions should always be extracted into metrics" ) )
168+ match self {
169+ Self :: Forward ( sessions) => {
170+ // Non-processing relay path - sessions should have been extracted to metrics.
171+ Err ( sessions. internal_error ( "sessions should always be extracted into metrics" ) )
172+ }
173+ Self :: Store ( sessions) => {
174+ // EAP double-write path: convert expanded sessions to TraceItems and store.
175+ let store_ctx = store:: Context {
176+ received_at : sessions. received_at ( ) ,
177+ scoping : sessions. scoping ( ) ,
178+ retention : ctx. retention ( |r| r. session . as_ref ( ) ) ,
179+ } ;
180+
181+ // Split sessions into updates and aggregates, keeping track of the aggregates
182+ // for later processing.
183+ let ( updates_managed, aggregates) =
184+ sessions. split_once ( |s, _| ( s. updates , s. aggregates ) ) ;
185+
186+ // Convert and store each session update.
187+ for session in updates_managed. split ( |updates| updates) {
188+ let item = session. try_map ( |session, _| {
189+ Ok :: < _ , std:: convert:: Infallible > ( store:: convert_session_update (
190+ & session, & store_ctx,
191+ ) )
192+ } ) ;
193+ if let Ok ( item) = item {
194+ s. store ( item) ;
195+ relay_statsd:: metric!(
196+ counter( RelayCounters :: SessionsEapProduced ) += 1 ,
197+ session_type = "update"
198+ ) ;
199+ }
200+ }
201+
202+ // Convert and store each session aggregate.
203+ // Aggregates are expanded into individual session rows to unify the format.
204+ for aggregate_batch in aggregates. split ( |aggs| aggs) {
205+ let release = aggregate_batch. attributes . release . clone ( ) ;
206+ let environment = aggregate_batch. attributes . environment . clone ( ) ;
207+
208+ for aggregate in aggregate_batch. split ( |batch| batch. aggregates ) {
209+ // Convert aggregate to multiple individual session items
210+ let items = store:: convert_session_aggregate (
211+ & aggregate,
212+ & release,
213+ environment. as_deref ( ) ,
214+ & store_ctx,
215+ ) ;
216+
217+ for item in items {
218+ let managed_item = aggregate. wrap ( item) ;
219+ s. store ( managed_item) ;
220+ relay_statsd:: metric!(
221+ counter( RelayCounters :: SessionsEapProduced ) += 1 ,
222+ session_type = "aggregate"
223+ ) ;
224+ }
225+ }
226+ }
227+
228+ Ok ( ( ) )
229+ }
230+ }
131231 }
132232}
133233
@@ -163,15 +263,15 @@ impl Counted for SerializedSessions {
163263 }
164264}
165265
166- #[ derive( Debug ) ]
266+ #[ derive( Clone , Debug ) ]
167267pub struct ExpandedSessions {
168268 /// Original envelope headers.
169- headers : EnvelopeHeaders ,
269+ pub ( crate ) headers : EnvelopeHeaders ,
170270
171271 /// A list of parsed session updates.
172- updates : Vec < SessionUpdate > ,
272+ pub ( crate ) updates : Vec < SessionUpdate > ,
173273 /// A list of parsed session aggregates.
174- aggregates : Vec < SessionAggregates > ,
274+ pub ( crate ) aggregates : Vec < SessionAggregates > ,
175275}
176276
177277impl Counted for ExpandedSessions {
0 commit comments