Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ dependencies = [
"relay-config",
"relay-log",
"relay-server",
"relay-system",
"serde_json",
"tempfile",
"tokio",
Expand Down
38 changes: 32 additions & 6 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use tempfile::TempDir;
use tokio::runtime::Runtime;

use relay_base_schema::project::ProjectKey;
use relay_server::managed::Managed;
use relay_server::{
Envelope, EnvelopeStack, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer,
SqliteEnvelopeStack, SqliteEnvelopeStore,
};
use relay_system::Addr;

fn setup_db(path: &PathBuf) -> Pool<Sqlite> {
let options = SqliteConnectOptions::new()
Expand Down Expand Up @@ -115,7 +117,10 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
|(mut stack, envelopes)| {
runtime.block_on(async {
for envelope in envelopes {
stack.push(envelope).await.unwrap();
stack
.push(Managed::from_envelope(envelope, Addr::dummy()))
.await
.unwrap();
}
});
},
Expand Down Expand Up @@ -145,7 +150,10 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
// Pre-fill the stack
for _ in 0..size {
let envelope = mock_envelope(envelope_size);
stack.push(envelope).await.unwrap();
stack
.push(Managed::from_envelope(envelope, Addr::dummy()))
.await
.unwrap();
}

stack
Expand Down Expand Up @@ -195,12 +203,24 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
for _ in 0..size {
if rand::random::<bool>() {
if let Some(envelope) = envelope_iter.next() {
stack.push(envelope).await.unwrap();
stack
.push(Managed::from_envelope(
envelope,
Addr::dummy(),
))
.await
.unwrap();
}
} else if stack.pop().await.is_err() {
// If pop fails (empty stack), push instead
if let Some(envelope) = envelope_iter.next() {
stack.push(envelope).await.unwrap();
stack
.push(Managed::from_envelope(
envelope,
Addr::dummy(),
))
.await
.unwrap();
}
}
}
Expand Down Expand Up @@ -265,7 +285,10 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
.await
.unwrap();
for envelope in envelopes.into_iter() {
buffer.push(envelope).await.unwrap();
buffer
.push(Managed::from_envelope(envelope, Addr::dummy()))
.await
.unwrap();
}
})
},
Expand Down Expand Up @@ -299,7 +322,10 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
let n = envelopes.len();
for envelope in envelopes.into_iter() {
let public_key = envelope.meta().public_key();
buffer.push(envelope).await.unwrap();
buffer
.push(Managed::from_envelope(envelope, Addr::dummy()))
.await
.unwrap();
// Mark as ready:
buffer.mark_ready(&public_key, true);
}
Expand Down
7 changes: 4 additions & 3 deletions relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ impl Counted for Box<Envelope> {
let mut quantities = Quantities::new();

// This matches the implementation of `ManagedEnvelope::reject`.
// Note: index_category is pushed first, then event_category (same order as ManagedEnvelope::reject).
let summary = EnvelopeSummary::compute(self);
if let Some(category) = summary.event_category {
quantities.push((category, 1));
if let Some(category) = category.index_category() {
quantities.push((category, 1));
if let Some(index_category) = category.index_category() {
quantities.push((index_category, 1));
}
quantities.push((category, 1));
}

let data = [
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/managed/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ impl<T: Counted> Managed<T> {
self.meta.received_at
}

/// Returns a reference to the outcome aggregator.
pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
&self.meta.outcome_aggregator
}

/// Scoping information stored in this context.
pub fn scoping(&self) -> Scoping {
self.meta.scoping
Expand Down
82 changes: 50 additions & 32 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::time::{Instant, timeout};

use crate::envelope::Envelope;
use crate::envelope::Item;
use crate::managed::Managed;
use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
Expand Down Expand Up @@ -76,8 +77,11 @@ impl PolymorphicEnvelopeBuffer {
}
}

/// Adds an envelope to the buffer.
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
/// Adds a managed envelope to the buffer.
pub async fn push(
&mut self,
envelope: Managed<Box<Envelope>>,
) -> Result<(), EnvelopeBufferError> {
relay_statsd::metric!(
distribution(RelayDistributions::BufferEnvelopeBodySize) =
envelope.items().map(Item::len).sum::<usize>() as u64,
Expand Down Expand Up @@ -112,7 +116,10 @@ impl PolymorphicEnvelopeBuffer {
}

/// Pops the next-in-line envelope.
pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
///
/// Returns a [`Managed`] envelope that will automatically emit outcomes when dropped
/// if not explicitly accepted.
pub async fn pop(&mut self) -> Result<Option<Managed<Box<Envelope>>>, EnvelopeBufferError> {
let envelope = relay_statsd::metric!(
timer(RelayTimers::BufferPop),
partition_id = self.partition_tag(),
Expand Down Expand Up @@ -309,11 +316,14 @@ where
);
}

/// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack.
/// Pushes a managed envelope to the appropriate envelope stack and re-prioritizes the stack.
///
/// If the envelope stack does not exist, a new stack is pushed to the priority queue.
/// The priority of the stack is updated with the envelope's received_at time.
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
pub async fn push(
&mut self,
envelope: Managed<Box<Envelope>>,
) -> Result<(), EnvelopeBufferError> {
let received_at = envelope.received_at();

let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
Expand All @@ -329,12 +339,8 @@ where
} else {
// Since we have initialization code that creates all the necessary stacks, we assume
// that any new stack that is added during the envelope buffer's lifecycle, is recreated.
self.push_stack(
StackCreationType::New,
ProjectKeyPair::from_envelope(&envelope),
Some(envelope),
)
.await?;
self.push_stack(StackCreationType::New, project_key_pair, Some(envelope))
.await?;
}
self.priority_queue
.change_priority_by(&project_key_pair, |prio| {
Expand Down Expand Up @@ -385,7 +391,10 @@ where
///
/// The priority of the envelope's stack is updated with the next envelope's received_at
/// time. If the stack is empty after popping, it is removed from the priority queue.
pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
///
/// Returns a [`Managed`] envelope that will automatically emit outcomes when dropped
/// if not explicitly accepted.
pub async fn pop(&mut self) -> Result<Option<Managed<Box<Envelope>>>, EnvelopeBufferError> {
let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
return Ok(None);
};
Expand Down Expand Up @@ -479,12 +488,12 @@ where
.await;
}

/// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
/// Pushes a new [`EnvelopeStack`] with the given managed [`Envelope`] inserted.
async fn push_stack(
&mut self,
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
envelope: Option<Box<Envelope>>,
envelope: Option<Managed<Box<Envelope>>>,
) -> Result<(), EnvelopeBufferError> {
let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());

Expand Down Expand Up @@ -726,6 +735,7 @@ mod tests {
use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
use crate::services::buffer::testutils::utils::mock_envelopes;
use crate::utils::MemoryStat;
use relay_system::Addr;

use super::*;

Expand Down Expand Up @@ -781,6 +791,14 @@ mod tests {
MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
}

fn managed_envelope(
own_key: ProjectKey,
sampling_key: Option<ProjectKey>,
event_id: Option<EventId>,
) -> Managed<Box<Envelope>> {
Managed::from_envelope(new_envelope(own_key, sampling_key, event_id), Addr::dummy())
}

async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
buffer.peek().await.unwrap().last_received_at().unwrap()
}
Expand All @@ -796,12 +814,12 @@ mod tests {
assert!(buffer.pop().await.unwrap().is_none());
assert!(buffer.peek().await.unwrap().is_empty());

let envelope1 = new_envelope(project_key1, None, None);
let time1 = envelope1.meta().received_at();
let envelope1 = managed_envelope(project_key1, None, None);
let time1 = envelope1.received_at();
buffer.push(envelope1).await.unwrap();

let envelope2 = new_envelope(project_key2, None, None);
let time2 = envelope2.meta().received_at();
let envelope2 = managed_envelope(project_key2, None, None);
let time2 = envelope2.received_at();
buffer.push(envelope2).await.unwrap();

// Both projects are ready, so project 2 is on top (has the newest envelopes):
Expand All @@ -813,8 +831,8 @@ mod tests {
// Both projects are not ready, so project 1 is on top (has the oldest envelopes):
assert_eq!(peek_received_at(&mut buffer).await, time1);

let envelope3 = new_envelope(project_key3, None, None);
let time3 = envelope3.meta().received_at();
let envelope3 = managed_envelope(project_key3, None, None);
let time3 = envelope3.received_at();
buffer.push(envelope3).await.unwrap();
buffer.mark_ready(&project_key3, false);

Expand Down Expand Up @@ -859,10 +877,10 @@ mod tests {

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

let envelope1 = new_envelope(project_key, None, None);
let time1 = envelope1.meta().received_at();
let envelope2 = new_envelope(project_key, None, None);
let time2 = envelope2.meta().received_at();
let envelope1 = managed_envelope(project_key, None, None);
let time1 = envelope1.received_at();
let envelope2 = managed_envelope(project_key, None, None);
let time2 = envelope2.received_at();

assert!(time2 > time1);

Expand All @@ -887,16 +905,16 @@ mod tests {
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();

let envelope1 = new_envelope(project_key1, None, None);
let envelope1 = managed_envelope(project_key1, None, None);
let time1 = envelope1.received_at();
buffer.push(envelope1).await.unwrap();

let envelope2 = new_envelope(project_key2, None, None);
let envelope2 = managed_envelope(project_key2, None, None);
let time2 = envelope2.received_at();
buffer.push(envelope2).await.unwrap();

let envelope3 = new_envelope(project_key1, Some(project_key2), None);
let time3 = envelope3.meta().received_at();
let envelope3 = managed_envelope(project_key1, Some(project_key2), None);
let time3 = envelope3.received_at();
buffer.push(envelope3).await.unwrap();

buffer.mark_ready(&project_key1, false);
Expand Down Expand Up @@ -959,11 +977,11 @@ mod tests {

let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
buffer
.push(new_envelope(project_key1, Some(project_key2), None))
.push(managed_envelope(project_key1, Some(project_key2), None))
.await
.unwrap();
buffer
.push(new_envelope(project_key2, Some(project_key1), None))
.push(managed_envelope(project_key2, Some(project_key1), None))
.await
.unwrap();
assert_eq!(buffer.priority_queue.len(), 2);
Expand Down Expand Up @@ -993,12 +1011,12 @@ mod tests {

let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_1 = EventId::new();
let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
let envelope1 = managed_envelope(project_key_1, None, Some(event_id_1));
let time1 = envelope1.received_at();

let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_2 = EventId::new();
let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
let envelope2 = managed_envelope(project_key_2, None, Some(event_id_2));
let time2 = envelope2.received_at();

buffer.push(envelope1).await.unwrap();
Expand Down
16 changes: 9 additions & 7 deletions relay-server/src/services/buffer/envelope_stack/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use chrono::{DateTime, Utc};

use super::EnvelopeStack;
use crate::envelope::Envelope;
use crate::managed::Managed;

/// An envelope stack implementation that caches one element in memory and delegates
/// to another envelope stack for additional storage.
#[derive(Debug)]
pub struct CachingEnvelopeStack<S> {
/// The underlying envelope stack
inner: S,
/// The cached envelope (if any)
cached: Option<Box<Envelope>>,
/// The cached managed envelope (if any)
cached: Option<Managed<Box<Envelope>>>,
}

impl<S> CachingEnvelopeStack<S>
Expand All @@ -32,7 +33,7 @@ where
{
type Error = S::Error;

async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
async fn push(&mut self, envelope: Managed<Box<Envelope>>) -> Result<(), Self::Error> {
if let Some(cached) = self.cached.take() {
self.inner.push(cached).await?;
}
Expand All @@ -49,8 +50,9 @@ where
}
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
async fn pop(&mut self) -> Result<Option<Managed<Box<Envelope>>>, Self::Error> {
if let Some(envelope) = self.cached.take() {
// Return the managed envelope - caller is responsible for accepting or dropping
Ok(Some(envelope))
} else {
self.inner.pop().await
Expand All @@ -73,16 +75,16 @@ where
mod tests {
use super::*;
use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
use crate::services::buffer::testutils::utils::mock_envelope;
use crate::services::buffer::testutils::utils::managed_envelope;

#[tokio::test]
async fn test_caching_stack() {
let inner = MemoryEnvelopeStack::new();
let mut stack = CachingEnvelopeStack::new(inner);

// Create test envelopes with different timestamps
let envelope_1 = mock_envelope(Utc::now());
let envelope_2 = mock_envelope(Utc::now());
let envelope_1 = managed_envelope(Utc::now());
let envelope_2 = managed_envelope(Utc::now());

// Push 2 envelopes
stack.push(envelope_1).await.unwrap();
Expand Down
Loading
Loading