Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,6 @@ pub async fn handle_envelope(

let project_key = envelope.meta().public_key();

// Prefetch sampling project key, current spooling implementations rely on this behavior.
//
// To be changed once spool v1 has been removed.
if let Some(sampling_project_key) = envelope.sampling_key()
&& sampling_project_key != project_key
{
state.project_cache_handle().fetch(sampling_project_key);
}

let rate_limits = state
.project_cache_handle()
.get(project_key)
Expand Down
43 changes: 28 additions & 15 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,7 @@ impl EnvelopeBufferService {
if Instant::now() >= next_project_fetch {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");

let own_key = project_key_pair.own_key;
let sampling_key = project_key_pair.sampling_key;

services.project_cache_handle.fetch(own_key);
if sampling_key != own_key {
services.project_cache_handle.fetch(sampling_key);
}
Self::trigger_project_fetch(project_key_pair, services);

// Deprioritize the stack to prevent head-of-line blocking and update the next fetch
// time.
Expand All @@ -413,21 +407,31 @@ impl EnvelopeBufferService {
Ok(sleep)
}

fn trigger_project_fetch(project_key_pair: ProjectKeyPair, services: &Services) {
let own_key = project_key_pair.own_key;
let sampling_key = project_key_pair.sampling_key;

services.project_cache_handle.fetch(own_key);
if sampling_key != own_key {
services.project_cache_handle.fetch(sampling_key);
}
}

fn drop_expired(envelope: Box<Envelope>, services: &Services) {
let mut managed_envelope =
ManagedEnvelope::new(envelope, services.outcome_aggregator.clone());
managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp));
}

async fn handle_message(buffer: &mut PolymorphicEnvelopeBuffer, message: EnvelopeBuffer) {
async fn handle_message(
buffer: &mut PolymorphicEnvelopeBuffer,
message: EnvelopeBuffer,
services: &Services,
) {
match message {
EnvelopeBuffer::Push(envelope) => {
// NOTE: This function assumes that a project state update for the relevant
// projects was already triggered (see XXX).
// For better separation of concerns, this prefetch should be triggered from here
// once buffer V1 has been removed.
relay_log::trace!("EnvelopeBufferService: received push message");
Self::push(buffer, envelope.into_envelope()).await;
Self::push(buffer, envelope.into_envelope(), services).await;
}
};
}
Expand All @@ -454,7 +458,16 @@ impl EnvelopeBufferService {
false
}

async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box<Envelope>) {
async fn push(
buffer: &mut PolymorphicEnvelopeBuffer,
envelope: Box<Envelope>,
services: &Services,
) {
let project_key_pair = ProjectKeyPair::from_envelope(&envelope);

// Prefetch configs so they are available as soon as possible.
Self::trigger_project_fetch(project_key_pair, services);

if let Err(e) = buffer.push(envelope).await {
relay_log::error!(
error = &e as &dyn std::error::Error,
Expand Down Expand Up @@ -654,7 +667,7 @@ impl Service for EnvelopeBufferService {
sleep = Duration::ZERO;
}
Some(message) = rx.recv() => {
Self::handle_message(&mut buffer, message).await;
Self::handle_message(&mut buffer, message, &services).await;
sleep = Duration::ZERO;
}
shutdown = shutdown.notified() => {
Expand Down
Loading