diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 0f3bca0885..693e60806b 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -347,15 +347,6 @@ pub async fn handle_envelope( let project_key = envelope.meta().public_key(); - // Prefetch sampling project key, current spooling implementations rely on this behavior. - // - // To be changed once spool v1 has been removed. - if let Some(sampling_project_key) = envelope.sampling_key() - && sampling_project_key != project_key - { - state.project_cache_handle().fetch(sampling_project_key); - } - let rate_limits = state .project_cache_handle() .get(project_key) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index c931b0c6e0..200fa50e0c 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -393,13 +393,7 @@ impl EnvelopeBufferService { if Instant::now() >= next_project_fetch { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); - let own_key = project_key_pair.own_key; - let sampling_key = project_key_pair.sampling_key; - - services.project_cache_handle.fetch(own_key); - if sampling_key != own_key { - services.project_cache_handle.fetch(sampling_key); - } + Self::trigger_project_fetch(project_key_pair, services); // Deprioritize the stack to prevent head-of-line blocking and update the next fetch // time. @@ -413,21 +407,31 @@ impl EnvelopeBufferService { Ok(sleep) } + fn trigger_project_fetch(project_key_pair: ProjectKeyPair, services: &Services) { + let own_key = project_key_pair.own_key; + let sampling_key = project_key_pair.sampling_key; + + services.project_cache_handle.fetch(own_key); + if sampling_key != own_key { + services.project_cache_handle.fetch(sampling_key); + } + } + fn drop_expired(envelope: Box, services: &Services) { let mut managed_envelope = ManagedEnvelope::new(envelope, services.outcome_aggregator.clone()); managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); } - async fn handle_message(buffer: &mut PolymorphicEnvelopeBuffer, message: EnvelopeBuffer) { + async fn handle_message( + buffer: &mut PolymorphicEnvelopeBuffer, + message: EnvelopeBuffer, + services: &Services, + ) { match message { EnvelopeBuffer::Push(envelope) => { - // NOTE: This function assumes that a project state update for the relevant - // projects was already triggered (see XXX). - // For better separation of concerns, this prefetch should be triggered from here - // once buffer V1 has been removed. relay_log::trace!("EnvelopeBufferService: received push message"); - Self::push(buffer, envelope.into_envelope()).await; + Self::push(buffer, envelope.into_envelope(), services).await; } }; } @@ -454,7 +458,16 @@ impl EnvelopeBufferService { false } - async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { + async fn push( + buffer: &mut PolymorphicEnvelopeBuffer, + envelope: Box, + services: &Services, + ) { + let project_key_pair = ProjectKeyPair::from_envelope(&envelope); + + // Prefetch configs so they are available as soon as possible. + Self::trigger_project_fetch(project_key_pair, services); + if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, @@ -654,7 +667,7 @@ impl Service for EnvelopeBufferService { sleep = Duration::ZERO; } Some(message) = rx.recv() => { - Self::handle_message(&mut buffer, message).await; + Self::handle_message(&mut buffer, message, &services).await; sleep = Duration::ZERO; } shutdown = shutdown.notified() => {