@@ -393,13 +393,7 @@ impl EnvelopeBufferService {
393393 if Instant :: now ( ) >= next_project_fetch {
394394 relay_log:: trace!( "EnvelopeBufferService: requesting project(s) update" ) ;
395395
396- let own_key = project_key_pair. own_key ;
397- let sampling_key = project_key_pair. sampling_key ;
398-
399- services. project_cache_handle . fetch ( own_key) ;
400- if sampling_key != own_key {
401- services. project_cache_handle . fetch ( sampling_key) ;
402- }
396+ Self :: trigger_project_fetch ( project_key_pair, services) ;
403397
404398 // Deprioritize the stack to prevent head-of-line blocking and update the next fetch
405399 // time.
@@ -413,21 +407,31 @@ impl EnvelopeBufferService {
413407 Ok ( sleep)
414408 }
415409
410+ fn trigger_project_fetch ( project_key_pair : ProjectKeyPair , services : & Services ) {
411+ let own_key = project_key_pair. own_key ;
412+ let sampling_key = project_key_pair. sampling_key ;
413+
414+ services. project_cache_handle . fetch ( own_key) ;
415+ if sampling_key != own_key {
416+ services. project_cache_handle . fetch ( sampling_key) ;
417+ }
418+ }
419+
416420 fn drop_expired ( envelope : Box < Envelope > , services : & Services ) {
417421 let mut managed_envelope =
418422 ManagedEnvelope :: new ( envelope, services. outcome_aggregator . clone ( ) ) ;
419423 managed_envelope. reject ( Outcome :: Invalid ( DiscardReason :: Timestamp ) ) ;
420424 }
421425
422- async fn handle_message ( buffer : & mut PolymorphicEnvelopeBuffer , message : EnvelopeBuffer ) {
426+ async fn handle_message (
427+ buffer : & mut PolymorphicEnvelopeBuffer ,
428+ message : EnvelopeBuffer ,
429+ services : & Services ,
430+ ) {
423431 match message {
424432 EnvelopeBuffer :: Push ( envelope) => {
425- // NOTE: This function assumes that a project state update for the relevant
426- // projects was already triggered (see XXX).
427- // For better separation of concerns, this prefetch should be triggered from here
428- // once buffer V1 has been removed.
429433 relay_log:: trace!( "EnvelopeBufferService: received push message" ) ;
430- Self :: push ( buffer, envelope. into_envelope ( ) ) . await ;
434+ Self :: push ( buffer, envelope. into_envelope ( ) , services ) . await ;
431435 }
432436 } ;
433437 }
@@ -454,7 +458,16 @@ impl EnvelopeBufferService {
454458 false
455459 }
456460
457- async fn push ( buffer : & mut PolymorphicEnvelopeBuffer , envelope : Box < Envelope > ) {
461+ async fn push (
462+ buffer : & mut PolymorphicEnvelopeBuffer ,
463+ envelope : Box < Envelope > ,
464+ services : & Services ,
465+ ) {
466+ let project_key_pair = ProjectKeyPair :: from_envelope ( & envelope) ;
467+
468+ // Prefetch configs so they are available as soon as possible.
469+ Self :: trigger_project_fetch ( project_key_pair, services) ;
470+
458471 if let Err ( e) = buffer. push ( envelope) . await {
459472 relay_log:: error!(
460473 error = & e as & dyn std:: error:: Error ,
@@ -654,7 +667,7 @@ impl Service for EnvelopeBufferService {
654667 sleep = Duration :: ZERO ;
655668 }
656669 Some ( message) = rx. recv( ) => {
657- Self :: handle_message( & mut buffer, message) . await ;
670+ Self :: handle_message( & mut buffer, message, & services ) . await ;
658671 sleep = Duration :: ZERO ;
659672 }
660673 shutdown = shutdown. notified( ) => {
0 commit comments