@@ -4,7 +4,7 @@ use bytes::Bytes;
44use either:: Either ;
55use relay_event_normalization:: GeoIpLookup ;
66use relay_event_schema:: processor:: ProcessingAction ;
7- use relay_event_schema:: protocol:: { AttachmentV2Meta , SpanId , SpanV2 } ;
7+ use relay_event_schema:: protocol:: { AttachmentV2Meta , SpanV2 } ;
88use relay_pii:: PiiConfigError ;
99use relay_protocol:: Annotated ;
1010use relay_quotas:: { DataCategory , RateLimits } ;
@@ -242,21 +242,24 @@ impl Forward for SpanOutput {
242242 retention : ctx. retention ( |r| r. span . as_ref ( ) ) ,
243243 } ;
244244
245- // Explicitly drop standalone attachments before splitting
246- // They are not stored for now.
247- // This must be fixed before enabling the feature flag.
248- let spans = spans. map ( |mut inner, record_keeper| {
249- if !inner. stand_alone_attachments . is_empty ( ) {
250- let standalone = std:: mem:: take ( & mut inner. stand_alone_attachments ) ;
251- record_keeper. reject_err ( Outcome :: Invalid ( DiscardReason :: Internal ) , standalone) ;
245+ let spans_and_attachments = spans. split ( |spans| spans. into_parts ( ) ) ;
246+ for either in spans_and_attachments {
247+ match either. transpose ( ) {
248+ Either :: Left ( span) => {
249+ if let Ok ( span) = span. try_map ( |span, _| store:: convert ( span, & ctx) ) {
250+ s. store ( span) ;
251+ }
252+ }
253+ Either :: Right ( attachment) => {
254+ if let Ok ( attachment) = store:: attachment:: convert (
255+ attachment,
256+ ctx. retention ,
257+ ctx. server_sample_rate ,
258+ ) {
259+ s. upload ( attachment) ;
260+ }
261+ }
252262 }
253- inner
254- } ) ;
255-
256- for span in spans. split ( |spans| spans. into_indexed_spans ( ) ) {
257- if let Ok ( span) = span. try_map ( |span, _| store:: convert ( span, & ctx) ) {
258- s. store ( span)
259- } ;
260263 }
261264
262265 Ok ( ( ) )
@@ -359,8 +362,7 @@ impl<C> ExpandedSpans<C> {
359362
360363 for ExpandedSpan { span, attachments } in self . spans {
361364 for attachment in attachments {
362- let span_id = span. value ( ) . and_then ( |s| s. span_id . value ( ) . copied ( ) ) ;
363- items. push ( attachment_to_item ( attachment, span_id) ?) ;
365+ items. push ( attachment_to_item ( attachment) ?) ;
364366 }
365367
366368 spans_without_attachments. push ( span) ;
@@ -373,7 +375,7 @@ impl<C> ExpandedSpans<C> {
373375 }
374376
375377 for attachment in self . stand_alone_attachments {
376- items. push ( attachment_to_item ( attachment, None ) ?) ;
378+ items. push ( attachment_to_item ( attachment) ?) ;
377379 }
378380
379381 Ok ( Envelope :: from_parts ( self . headers , Items :: from_vec ( items) ) )
@@ -403,11 +405,12 @@ impl<C> ExpandedSpans<C> {
403405 }
404406}
405407
406- fn attachment_to_item (
407- attachment : ExpandedAttachment ,
408- span_id : Option < SpanId > ,
409- ) -> Result < Item , ContainerWriteError > {
410- let ExpandedAttachment { meta, body } = attachment;
408+ fn attachment_to_item ( attachment : ExpandedAttachment ) -> Result < Item , ContainerWriteError > {
409+ let ExpandedAttachment {
410+ parent_id,
411+ meta,
412+ body,
413+ } = attachment;
411414
412415 let meta_json = meta. to_json ( ) ?;
413416 let meta_bytes = meta_json. into_bytes ( ) ;
@@ -420,7 +423,7 @@ fn attachment_to_item(
420423 let mut item = Item :: new ( ItemType :: Attachment ) ;
421424 item. set_payload ( ContentType :: AttachmentV2 , payload. freeze ( ) ) ;
422425 item. set_meta_length ( meta_length as u32 ) ;
423- item. set_parent_id ( ParentId :: SpanId ( span_id ) ) ;
426+ item. set_parent_id ( parent_id ) ;
424427
425428 Ok ( item)
426429}
@@ -450,8 +453,22 @@ impl ExpandedSpans<TotalAndIndexed> {
450453
451454impl ExpandedSpans < Indexed > {
452455 #[ cfg( feature = "processing" ) ]
453- fn into_indexed_spans ( self ) -> impl Iterator < Item = IndexedSpan > {
454- self . spans . into_iter ( ) . map ( IndexedSpan )
456+ fn into_parts ( self ) -> impl Iterator < Item = Either < IndexedSpanOnly , ExpandedAttachment > > {
457+ let Self {
458+ headers : _,
459+ server_sample_rate : _,
460+ spans,
461+ stand_alone_attachments,
462+ category : _,
463+ } = self ;
464+ spans
465+ . into_iter ( )
466+ . flat_map ( |span| {
467+ let ExpandedSpan { span, attachments } = span;
468+ std:: iter:: once ( Either :: Left ( IndexedSpanOnly ( span) ) )
469+ . chain ( attachments. into_iter ( ) . map ( Either :: Right ) )
470+ } )
471+ . chain ( stand_alone_attachments. into_iter ( ) . map ( Either :: Right ) )
455472 }
456473}
457474
@@ -610,23 +627,23 @@ impl Managed<ExpandedSpans<TotalAndIndexed>> {
610627 }
611628}
612629
613- /// A Span which only represents the indexed category.
614630#[ cfg( feature = "processing" ) ]
615631#[ derive( Debug ) ]
616- struct IndexedSpan ( ExpandedSpan ) ;
632+ struct IndexedSpanOnly ( WithHeader < SpanV2 > ) ;
617633
618634#[ cfg( feature = "processing" ) ]
619- impl Counted for IndexedSpan {
635+ impl Counted for IndexedSpanOnly {
620636 fn quantities ( & self ) -> Quantities {
621- let mut quantities = smallvec:: smallvec![ ( DataCategory :: SpanIndexed , 1 ) ] ;
622- quantities. extend ( self . 0 . attachments . quantities ( ) ) ;
623- quantities
637+ smallvec:: smallvec![ ( DataCategory :: SpanIndexed , 1 ) ]
624638 }
625639}
626640
627641/// A validated and parsed span attachment.
628642#[ derive( Debug ) ]
629643pub struct ExpandedAttachment {
644+ /// The ID of the log / span / metric that owns the span.
645+ pub parent_id : ParentId ,
646+
630647 /// The parsed metadata from the attachment.
631648 pub meta : Annotated < AttachmentV2Meta > ,
632649
0 commit comments