Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions relay-server/src/processing/attachments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,11 @@ impl AttachmentProcessor {
}

impl processing::Processor for AttachmentProcessor {
type UnitOfWork = SerializedAttachments;
type Input = SerializedAttachments;
type Output = AttachmentsOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
debug_assert!(
!envelope.envelope().items().any(Item::creates_event),
"AttachmentProcessor should not receive items that create events"
Expand All @@ -91,7 +88,7 @@ impl processing::Processor for AttachmentProcessor {

async fn process(
&self,
attachments: Managed<Self::UnitOfWork>,
attachments: Managed<Self::Input>,
ctx: processing::Context<'_>,
) -> Result<processing::Output<Self::Output>, Rejected<Self::Error>> {
for item in &attachments.attachments {
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/check_ins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,11 @@ impl CheckInsProcessor {
}

impl processing::Processor for CheckInsProcessor {
type UnitOfWork = SerializedCheckIns;
type Input = SerializedCheckIns;
type Output = CheckInsOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

let check_ins = envelope
Expand All @@ -80,7 +77,7 @@ impl processing::Processor for CheckInsProcessor {

async fn process(
&self,
mut check_ins: Managed<Self::UnitOfWork>,
mut check_ins: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
if ctx.is_processing() {
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/client_reports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,11 @@ impl ClientReportsProcessor {
}

impl processing::Processor for ClientReportsProcessor {
type UnitOfWork = SerializedClientReports;
type Input = SerializedClientReports;
type Output = Nothing;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();
let reports = envelope
.envelope_mut()
Expand All @@ -87,7 +84,7 @@ impl processing::Processor for ClientReportsProcessor {

async fn process(
&self,
reports: Managed<Self::UnitOfWork>,
reports: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
let mut outcomes = process::expand(reports);
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,11 @@ impl ErrorsProcessor {
}

impl processing::Processor for ErrorsProcessor {
type UnitOfWork = SerializedError;
type Input = SerializedError;
type Output = ErrorOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let has_transaction = envelope
.envelope()
.items()
Expand Down Expand Up @@ -107,7 +104,7 @@ impl processing::Processor for ErrorsProcessor {

async fn process(
&self,
error: Managed<Self::UnitOfWork>,
error: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
let mut error = process::expand(error, ctx)?;
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,11 @@ impl LogsProcessor {
}

impl processing::Processor for LogsProcessor {
type UnitOfWork = SerializedLogs;
type Input = SerializedLogs;
type Output = LogOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

let logs = envelope
Expand Down Expand Up @@ -135,7 +132,7 @@ impl processing::Processor for LogsProcessor {

async fn process(
&self,
logs: Managed<Self::UnitOfWork>,
logs: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Error>> {
validate::container(&logs).reject(&logs)?;
Expand Down
15 changes: 7 additions & 8 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,24 @@ pub mod utils;
/// defines all items in an event based envelope to relate to the envelope.
pub trait Processor {
/// A unit of work, the processor can process.
type UnitOfWork: Counted;
/// The result after processing a [`Self::UnitOfWork`].
type Input: Counted;
/// The result after processing a [`Self::Input`].
type Output: Forward;
/// The error returned by [`Self::process`].
type Error: std::error::Error + 'static;

/// Extracts a [`Self::UnitOfWork`] from a [`ManagedEnvelope`].
/// Extracts a [`Self::Input`] from a [`ManagedEnvelope`].
///
/// This is infallible, if a processor wants to report an error,
/// it should return a [`Self::UnitOfWork`] which later, can produce an error when being processed.
/// it should return a [`Self::Input`] which later, can produce an error when being processed.
///
/// Returns `None` if nothing in the envelope concerns this processor.
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope)
-> Option<Managed<Self::UnitOfWork>>;
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>>;

/// Processes a [`Self::UnitOfWork`].
/// Processes a [`Self::Input`].
async fn process(
&self,
work: Managed<Self::UnitOfWork>,
work: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>>;
}
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/profile_chunks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,11 @@ impl ProfileChunksProcessor {
}

impl processing::Processor for ProfileChunksProcessor {
type UnitOfWork = SerializedProfileChunks;
type Input = SerializedProfileChunks;
type Output = ProfileChunkOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let profile_chunks = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::ProfileChunk))
Expand All @@ -100,7 +97,7 @@ impl processing::Processor for ProfileChunksProcessor {

async fn process(
&self,
mut profile_chunks: Managed<Self::UnitOfWork>,
mut profile_chunks: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Error>> {
filter::feature_flag(ctx).reject(&profile_chunks)?;
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/replays/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,11 @@ impl ReplaysProcessor {
}

impl processing::Processor for ReplaysProcessor {
type UnitOfWork = SerializedReplays;
type Input = SerializedReplays;
type Output = ReplaysOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();
let events = envelope
.envelope_mut()
Expand Down Expand Up @@ -194,7 +191,7 @@ impl processing::Processor for ReplaysProcessor {

async fn process(
&self,
replays: Managed<Self::UnitOfWork>,
replays: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
let replays = filter::feature_flag(replays, ctx)?;
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,11 @@ impl SessionsProcessor {
}

impl processing::Processor for SessionsProcessor {
type UnitOfWork = SerializedSessions;
type Input = SerializedSessions;
type Output = SessionsOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

let updates = envelope
Expand All @@ -86,7 +83,7 @@ impl processing::Processor for SessionsProcessor {

async fn process(
&self,
sessions: Managed<Self::UnitOfWork>,
sessions: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
let mut sessions = match process::expand(sessions, ctx) {
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,11 @@ impl SpansProcessor {
}

impl processing::Processor for SpansProcessor {
type UnitOfWork = SerializedSpans;
type Input = SerializedSpans;
type Output = SpanOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

let spans = envelope
Expand Down Expand Up @@ -155,7 +152,7 @@ impl processing::Processor for SpansProcessor {

async fn process(
&self,
spans: Managed<Self::UnitOfWork>,
spans: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
let spans = filter::feature_flag_attachment(spans, ctx);
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/trace_attachments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,13 @@ impl TraceAttachmentsProcessor {
}

impl Processor for TraceAttachmentsProcessor {
type UnitOfWork = SerializedAttachments;
type Input = SerializedAttachments;

type Output = Managed<ExpandedAttachments>;

type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();
let items = envelope
.envelope_mut()
Expand All @@ -115,7 +112,7 @@ impl Processor for TraceAttachmentsProcessor {

async fn process(
&self,
work: Managed<Self::UnitOfWork>,
work: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
let work = filter::feature_flag(work, ctx)?;
Expand Down
9 changes: 3 additions & 6 deletions relay-server/src/processing/trace_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,11 @@ impl TraceMetricsProcessor {
}

impl processing::Processor for TraceMetricsProcessor {
type UnitOfWork = SerializedTraceMetrics;
type Input = SerializedTraceMetrics;
type Output = TraceMetricOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

let metrics = envelope
Expand All @@ -127,7 +124,7 @@ impl processing::Processor for TraceMetricsProcessor {

async fn process(
&self,
metrics: Managed<Self::UnitOfWork>,
metrics: Managed<Self::Input>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Error>> {
validate::container(&metrics)?;
Expand Down
Loading
Loading