@@ -12,6 +12,8 @@ use sentry_arroyo::types::{InnerMessage, Message, Partition};
1212use sentry_arroyo:: utils:: timing:: Deadline ;
1313use sentry_protos:: snuba:: v1:: TraceItem ;
1414
15+ use sentry_options:: options;
16+
1517use crate :: types:: { AggregatedOutcomesBatch , BucketKey } ;
1618
1719#[ derive( Debug , Default ) ]
@@ -58,8 +60,7 @@ pub struct OutcomesAggregator<TNext> {
5860 message_carried_over : Option < Message < AggregatedOutcomesBatch > > ,
5961 /// Commit request carried over from a poll where we had a message to retry.
6062 commit_request_carried_over : Option < CommitRequest > ,
61- /// Temporary option to change the timestamp source from
62- /// `received` to `timestamp` on the item event.
63+ /// Cached value of the `consumer.use_item_timestamp` option, refreshed on each poll.
6364 use_item_timestamp : bool ,
6465}
6566
@@ -69,7 +70,6 @@ impl<TNext> OutcomesAggregator<TNext> {
6970 max_batch_size : usize ,
7071 max_batch_time_ms : Duration ,
7172 bucket_interval : u64 ,
72- use_item_timestamp : bool ,
7373 ) -> Self {
7474 Self {
7575 next_step,
@@ -81,7 +81,7 @@ impl<TNext> OutcomesAggregator<TNext> {
8181 latest_offsets : HashMap :: new ( ) ,
8282 message_carried_over : None ,
8383 commit_request_carried_over : None ,
84- use_item_timestamp,
84+ use_item_timestamp : false ,
8585 }
8686 }
8787
@@ -132,6 +132,12 @@ impl<TNext: ProcessingStrategy<AggregatedOutcomesBatch>> ProcessingStrategy<Kafk
132132 for OutcomesAggregator < TNext >
133133{
134134 fn poll ( & mut self ) -> Result < Option < CommitRequest > , StrategyError > {
135+ self . use_item_timestamp = options ( "snuba" )
136+ . ok ( )
137+ . and_then ( |o| o. get ( "consumer.use_item_timestamp" ) . ok ( ) )
138+ . and_then ( |v| v. as_bool ( ) )
139+ . unwrap_or ( false ) ;
140+
135141 let commit_request = self . next_step . poll ( ) ?;
136142 self . commit_request_carried_over =
137143 merge_commit_request ( self . commit_request_carried_over . take ( ) , commit_request) ;
@@ -212,6 +218,7 @@ impl<TNext: ProcessingStrategy<AggregatedOutcomesBatch>> ProcessingStrategy<Kafk
212218 . map ( |t| t. seconds as u64 )
213219 . unwrap_or ( 0 )
214220 } ;
221+
215222 let org_id = trace_item. organization_id ;
216223 let project_id = trace_item. project_id ;
217224
@@ -267,7 +274,10 @@ mod tests {
267274 use prost:: Message as ProstMessage ;
268275 use prost_types:: Timestamp ;
269276 use sentry_arroyo:: types:: { Partition , Topic } ;
277+ use sentry_options:: init_with_schemas;
278+ use sentry_options:: testing:: override_options;
270279 use sentry_protos:: snuba:: v1:: { CategoryCount , Outcomes } ;
280+ use serde_json:: json;
271281
272282 struct Noop {
273283 last_message : Option < Message < AggregatedOutcomesBatch > > ,
@@ -327,7 +337,6 @@ mod tests {
327337 500 ,
328338 Duration :: from_millis ( 5_000 ) ,
329339 60 ,
330- false ,
331340 ) ;
332341
333342 let topic = Topic :: new ( "accepted-outcomes" ) ;
@@ -375,7 +384,6 @@ mod tests {
375384 500 ,
376385 Duration :: from_millis ( 2_000 ) ,
377386 60 ,
378- false ,
379387 ) ;
380388
381389 let topic = Topic :: new ( "snuba-items" ) ;
@@ -459,12 +467,12 @@ mod tests {
459467
460468 #[ test]
461469 fn poll_flushes_when_max_batch_size_reached ( ) {
470+ init_with_schemas ( & [ ( "snuba" , crate :: SNUBA_SCHEMA ) ] ) . unwrap ( ) ;
462471 let mut aggregator = OutcomesAggregator :: new (
463472 Noop { last_message : None } ,
464473 1 ,
465474 Duration :: from_millis ( 30_000 ) ,
466475 60 ,
467- false ,
468476 ) ;
469477
470478 let partition = Partition :: new ( Topic :: new ( "accepted-outcomes" ) , 0 ) ;
@@ -485,6 +493,7 @@ mod tests {
485493
486494 #[ test]
487495 fn submit_returns_backpressure_when_message_carried_over ( ) {
496+ init_with_schemas ( & [ ( "snuba" , crate :: SNUBA_SCHEMA ) ] ) . unwrap ( ) ;
488497 struct RejectOnce {
489498 rejected : bool ,
490499 }
@@ -517,7 +526,6 @@ mod tests {
517526 1 , // flush after 1 bucket
518527 Duration :: from_millis ( 30_000 ) ,
519528 60 ,
520- false ,
521529 ) ;
522530
523531 let partition = Partition :: new ( Topic :: new ( "test" ) , 0 ) ;
@@ -553,6 +561,7 @@ mod tests {
553561
554562 #[ test]
555563 fn join_honors_timeout_when_message_stays_carried_over ( ) {
564+ init_with_schemas ( & [ ( "snuba" , crate :: SNUBA_SCHEMA ) ] ) . unwrap ( ) ;
556565 struct AlwaysReject ;
557566 impl ProcessingStrategy < AggregatedOutcomesBatch > for AlwaysReject {
558567 fn poll ( & mut self ) -> Result < Option < CommitRequest > , StrategyError > {
@@ -574,7 +583,7 @@ mod tests {
574583 }
575584
576585 let mut aggregator =
577- OutcomesAggregator :: new ( AlwaysReject , 1 , Duration :: from_millis ( 30_000 ) , 60 , false ) ;
586+ OutcomesAggregator :: new ( AlwaysReject , 1 , Duration :: from_millis ( 30_000 ) , 60 ) ;
578587 let partition = Partition :: new ( Topic :: new ( "test" ) , 0 ) ;
579588 let payload = make_payload ( 6_000 , 1 , 2 , 3 , & [ ( 4 , 1 ) ] ) ;
580589
@@ -594,12 +603,14 @@ mod tests {
594603
595604 #[ test]
596605 fn submit_uses_item_timestamp_when_enabled ( ) {
606+ init_with_schemas ( & [ ( "snuba" , crate :: SNUBA_SCHEMA ) ] ) . unwrap ( ) ;
607+ let _guard =
608+ override_options ( & [ ( "snuba" , "consumer.use_item_timestamp" , json ! ( true ) ) ] ) . unwrap ( ) ;
597609 let mut aggregator = OutcomesAggregator :: new (
598610 Noop { last_message : None } ,
599611 500 ,
600612 Duration :: from_millis ( 2_000 ) ,
601613 60 ,
602- true ,
603614 ) ;
604615
605616 let topic = Topic :: new ( "snuba-items" ) ;
@@ -629,6 +640,9 @@ mod tests {
629640 trace_item. encode ( & mut buf) . unwrap ( ) ;
630641 let payload = KafkaPayload :: new ( None , None , Some ( buf) ) ;
631642
643+ // we need to poll first in order to get the new value (true)
644+ aggregator. poll ( ) . unwrap ( ) ;
645+
632646 aggregator
633647 . submit ( Message :: new_broker_message (
634648 payload,
@@ -650,4 +664,82 @@ mod tests {
650664 Some ( 1 )
651665 ) ;
652666 }
667+
668+ #[ test]
669+ fn poll_updates_use_item_timestamp_dynamically ( ) {
670+ init_with_schemas ( & [ ( "snuba" , crate :: SNUBA_SCHEMA ) ] ) . unwrap ( ) ;
671+ let mut aggregator = OutcomesAggregator :: new (
672+ Noop { last_message : None } ,
673+ 500 ,
674+ Duration :: from_millis ( 30_000 ) ,
675+ 60 ,
676+ ) ;
677+
678+ let partition = Partition :: new ( Topic :: new ( "snuba-items" ) , 0 ) ;
679+
680+ let mut buf = Vec :: new ( ) ;
681+ TraceItem {
682+ organization_id : 1 ,
683+ project_id : 2 ,
684+ received : Some ( Timestamp {
685+ seconds : 1_700_000_000 ,
686+ nanos : 0 ,
687+ } ) ,
688+ timestamp : Some ( Timestamp {
689+ seconds : 1_700_000_060 ,
690+ nanos : 0 ,
691+ } ) ,
692+ outcomes : Some ( Outcomes {
693+ key_id : 3 ,
694+ category_count : vec ! [ CategoryCount {
695+ data_category: 4 ,
696+ quantity: 1 ,
697+ } ] ,
698+ } ) ,
699+ ..Default :: default ( )
700+ }
701+ . encode ( & mut buf)
702+ . unwrap ( ) ;
703+ let payload = KafkaPayload :: new ( None , None , Some ( buf) ) ;
704+
705+ let bucket_quantity = |aggregator : & OutcomesAggregator < Noop > , offset : u64 | {
706+ let key = BucketKey {
707+ time_offset : offset,
708+ org_id : 1 ,
709+ project_id : 2 ,
710+ key_id : 3 ,
711+ category : 4 ,
712+ } ;
713+ aggregator. batch . buckets . get ( & key) . map ( |s| s. quantity )
714+ } ;
715+
716+ let mut offset = 0 ;
717+ let mut do_submit = |aggregator : & mut OutcomesAggregator < Noop > | {
718+ aggregator. poll ( ) . unwrap ( ) ;
719+ aggregator
720+ . submit ( Message :: new_broker_message (
721+ payload. clone ( ) ,
722+ partition,
723+ offset,
724+ Utc :: now ( ) ,
725+ ) )
726+ . unwrap ( ) ;
727+ offset += 1 ;
728+ } ;
729+
730+ // Enable item timestamp
731+ let guard =
732+ override_options ( & [ ( "snuba" , "consumer.use_item_timestamp" , json ! ( true ) ) ] ) . unwrap ( ) ;
733+ do_submit ( & mut aggregator) ;
734+ assert_eq ! ( bucket_quantity( & aggregator, 28_333_334 ) , Some ( 1 ) ) ;
735+ assert_eq ! ( bucket_quantity( & aggregator, 28_333_333 ) , None ) ;
736+
737+ // Disable item timestamp
738+ drop ( guard) ;
739+ let _guard =
740+ override_options ( & [ ( "snuba" , "consumer.use_item_timestamp" , json ! ( false ) ) ] ) . unwrap ( ) ;
741+ do_submit ( & mut aggregator) ;
742+ assert_eq ! ( bucket_quantity( & aggregator, 28_333_333 ) , Some ( 1 ) ) ;
743+ assert_eq ! ( bucket_quantity( & aggregator, 28_333_334 ) , Some ( 1 ) ) ; // still present from first submit
744+ }
653745}
0 commit comments