@@ -12,13 +12,15 @@ use relay_system::{Addr, BroadcastChannel};
1212use serde:: { Deserialize , Serialize } ;
1313use tokio:: time:: Instant ;
1414
15+ use crate :: envelope:: ItemType ;
1516use crate :: services:: metrics:: { Aggregator , MergeBuckets } ;
1617use crate :: services:: outcome:: { DiscardReason , Outcome } ;
1718use crate :: services:: processor:: { EncodeMetricMeta , EnvelopeProcessor , ProcessProjectMetrics } ;
1819use crate :: services:: project:: state:: ExpiryState ;
1920use crate :: services:: project_cache:: {
2021 CheckedEnvelope , ProcessMetrics , ProjectCache , RequestUpdate ,
2122} ;
23+ use crate :: utils:: { Enforcement , SeqCount } ;
2224
2325use crate :: statsd:: RelayCounters ;
2426use crate :: utils:: { EnvelopeLimiter , ManagedEnvelope , RetryBackoff } ;
@@ -555,9 +557,19 @@ impl Project {
555557 Ok ( current_limits. check_with_quotas ( quotas, item_scoping) )
556558 } ) ;
557559
558- let ( enforcement, mut rate_limits) =
560+ let ( mut enforcement, mut rate_limits) =
559561 envelope_limiter. compute ( envelope. envelope_mut ( ) , & scoping) ?;
560562
563+ let check_nested_spans = state
564+ . as_ref ( )
565+ . is_some_and ( |s| s. has_feature ( Feature :: ExtractSpansFromEvent ) ) ;
566+
567+ // If we can extract spans from the event, we want to try and count the number of nested
568+ // spans to correctly emit negative outcomes in case the transaction itself is dropped.
569+ if check_nested_spans {
570+ sync_spans_to_enforcement ( & envelope, & mut enforcement) ;
571+ }
572+
561573 enforcement. apply_with_outcomes ( & mut envelope) ;
562574
563575 envelope. update ( ) ;
@@ -586,9 +598,52 @@ impl Project {
586598 }
587599}
588600
601+ /// Adds category limits for the nested spans inside a transaction.
602+ ///
603+ /// On the fast path of rate limiting, we do not have nested spans of a transaction extracted
604+ /// as top-level spans, thus if we limited a transaction, we want to count and emit negative
605+ /// outcomes for each of the spans nested inside that transaction.
606+ fn sync_spans_to_enforcement ( envelope : & ManagedEnvelope , enforcement : & mut Enforcement ) {
607+ if !enforcement. is_event_active ( ) {
608+ return ;
609+ }
610+
611+ let spans_count = count_nested_spans ( envelope) ;
612+ if spans_count == 0 {
613+ return ;
614+ }
615+
616+ if enforcement. event . is_active ( ) {
617+ enforcement. spans = enforcement. event . clone_for ( DataCategory :: Span , spans_count) ;
618+ }
619+
620+ if enforcement. event_indexed . is_active ( ) {
621+ enforcement. spans_indexed = enforcement
622+ . event_indexed
623+ . clone_for ( DataCategory :: SpanIndexed , spans_count) ;
624+ }
625+ }
626+
627+ /// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope).
628+ fn count_nested_spans ( envelope : & ManagedEnvelope ) -> usize {
629+ #[ derive( Debug , Deserialize ) ]
630+ struct PartialEvent {
631+ spans : SeqCount ,
632+ }
633+
634+ envelope
635+ . envelope ( )
636+ . items ( )
637+ . find ( |item| * item. ty ( ) == ItemType :: Transaction && !item. spans_extracted ( ) )
638+ . and_then ( |item| serde_json:: from_slice :: < PartialEvent > ( & item. payload ( ) ) . ok ( ) )
639+ // We do + 1, since we count the transaction itself because it will be extracted
640+ // as a span and counted during the slow path of rate limiting.
641+ . map_or ( 0 , |event| event. spans . 0 + 1 )
642+ }
643+
589644#[ cfg( test) ]
590645mod tests {
591- use crate :: envelope:: { ContentType , Envelope , Item , ItemType } ;
646+ use crate :: envelope:: { ContentType , Envelope , Item } ;
592647 use crate :: extractors:: RequestMeta ;
593648 use crate :: services:: processor:: ProcessingGroup ;
594649 use relay_base_schema:: project:: ProjectId ;
@@ -720,7 +775,27 @@ mod tests {
720775 RequestMeta :: new ( dsn)
721776 }
722777
723- const EVENT_WITH_SPANS : & str = r#"{
778+ #[ test]
779+ fn test_track_nested_spans_outcomes ( ) {
780+ let mut project = create_project ( Some ( json ! ( {
781+ "features" : [
782+ "organizations:indexed-spans-extraction"
783+ ] ,
784+ "quotas" : [ {
785+ "id" : "foo" ,
786+ "categories" : [ "transaction" ] ,
787+ "window" : 3600 ,
788+ "limit" : 0 ,
789+ "reasonCode" : "foo" ,
790+ } ]
791+ } ) ) ) ;
792+
793+ let mut envelope = Envelope :: from_request ( Some ( EventId :: new ( ) ) , request_meta ( ) ) ;
794+
795+ let mut transaction = Item :: new ( ItemType :: Transaction ) ;
796+ transaction. set_payload (
797+ ContentType :: Json ,
798+ r#"{
724799 "event_id": "52df9022835246eeb317dbd739ccd059",
725800 "type": "transaction",
726801 "transaction": "I have a stale timestamp, but I'm recent!",
@@ -746,27 +821,8 @@ mod tests {
746821 "trace_id": "ff62a8b040f340bda5d830223def1d81"
747822 }
748823 ]
749- }"# ;
750-
751- #[ test]
752- fn test_track_nested_spans_outcomes ( ) {
753- let mut project = create_project ( Some ( json ! ( {
754- "features" : [
755- "organizations:indexed-spans-extraction"
756- ] ,
757- "quotas" : [ {
758- "id" : "foo" ,
759- "categories" : [ "transaction" ] ,
760- "window" : 3600 ,
761- "limit" : 0 ,
762- "reasonCode" : "foo" ,
763- } ]
764- } ) ) ) ;
765-
766- let mut envelope = Envelope :: from_request ( Some ( EventId :: new ( ) ) , request_meta ( ) ) ;
767-
768- let mut transaction = Item :: new ( ItemType :: Transaction ) ;
769- transaction. set_payload ( ContentType :: Json , EVENT_WITH_SPANS ) ;
824+ }"# ,
825+ ) ;
770826
771827 envelope. add_item ( transaction) ;
772828
@@ -796,59 +852,4 @@ mod tests {
796852 assert_eq ! ( outcome. quantity, expected_quantity) ;
797853 }
798854 }
799-
800- #[ test]
801- fn test_track_nested_spans_outcomes_span_quota ( ) {
802- let mut project = create_project ( Some ( json ! ( {
803- "features" : [
804- "organizations:indexed-spans-extraction"
805- ] ,
806- "quotas" : [ {
807- "id" : "foo" ,
808- "categories" : [ "span_indexed" ] ,
809- "window" : 3600 ,
810- "limit" : 0 ,
811- "reasonCode" : "foo" ,
812- } ]
813- } ) ) ) ;
814-
815- let mut envelope = Envelope :: from_request ( Some ( EventId :: new ( ) ) , request_meta ( ) ) ;
816-
817- let mut transaction = Item :: new ( ItemType :: Transaction ) ;
818- transaction. set_payload ( ContentType :: Json , EVENT_WITH_SPANS ) ;
819-
820- envelope. add_item ( transaction) ;
821-
822- let ( outcome_aggregator, mut outcome_aggregator_rx) = Addr :: custom ( ) ;
823- let ( test_store, _) = Addr :: custom ( ) ;
824-
825- let managed_envelope = ManagedEnvelope :: new (
826- envelope,
827- outcome_aggregator. clone ( ) ,
828- test_store,
829- ProcessingGroup :: Transaction ,
830- ) ;
831-
832- let CheckedEnvelope {
833- envelope,
834- rate_limits : _,
835- } = project. check_envelope ( managed_envelope) . unwrap ( ) ;
836- let envelope = envelope. unwrap ( ) ;
837- let transaction_item = envelope
838- . envelope ( )
839- . items ( )
840- . find ( |i| * i. ty ( ) == ItemType :: Transaction )
841- . unwrap ( ) ;
842- assert ! ( transaction_item. spans_extracted( ) ) ;
843-
844- drop ( outcome_aggregator) ;
845-
846- let expected = [ ( DataCategory :: SpanIndexed , 3 ) ] ;
847-
848- for ( expected_category, expected_quantity) in expected {
849- let outcome = outcome_aggregator_rx. blocking_recv ( ) . unwrap ( ) ;
850- assert_eq ! ( outcome. category, expected_category) ;
851- assert_eq ! ( outcome. quantity, expected_quantity) ;
852- }
853- }
854855}
0 commit comments