Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
076b0c4
boilerplate
tobias-wilfert Jan 13, 2026
02bc9eb
initial logic changes (wip)
tobias-wilfert Jan 26, 2026
37e35f3
update tests to send both event and recording in envelope
tobias-wilfert Jan 26, 2026
89b9b11
cleanup + move code into modules
tobias-wilfert Jan 28, 2026
dcd9f86
remove dead code
tobias-wilfert Jan 28, 2026
aebb77c
appease clippy
tobias-wilfert Jan 28, 2026
4c987b0
Merge branch 'master' into tobias-wilfert/feat/process_replay
tobias-wilfert Jan 28, 2026
d136068
cleanup
tobias-wilfert Jan 28, 2026
4e73071
remove comments
tobias-wilfert Jan 28, 2026
5362c0c
replace code comments with pr comments
tobias-wilfert Jan 28, 2026
80fd5ad
changelog message
tobias-wilfert Jan 29, 2026
5120b7e
use `for-loop` instead of `for_each`
tobias-wilfert Jan 29, 2026
6241219
use `Items` instead of `Vec<Item>`
tobias-wilfert Jan 29, 2026
13a1ebf
use `ContentType::OctetStream` instead of `ContentType::MsgPack`
tobias-wilfert Jan 29, 2026
cdc5bc4
move scrubbing after filter and quotas
tobias-wilfert Jan 29, 2026
0a94211
update comments
tobias-wilfert Jan 29, 2026
4223bfe
extract magic number into constant
tobias-wilfert Jan 30, 2026
3ddc11e
use `#[from]` instead of `String` for errors
tobias-wilfert Jan 30, 2026
666ff6b
combine `scrub` and `scrub_recording`
tobias-wilfert Jan 30, 2026
9865ea7
add error logs to validate envelope content assumptions
tobias-wilfert Jan 30, 2026
d94b4d4
improve `ReplayError`
tobias-wilfert Jan 30, 2026
5013f8a
add clarifying comment on the counting discrepancy
tobias-wilfert Jan 30, 2026
8ea5bf6
add clarifying comment on the no event no recording case
tobias-wilfert Jan 30, 2026
a336cd6
fix up `reject_err`
tobias-wilfert Jan 30, 2026
2888932
Merge branch 'master' into tobias-wilfert/feat/process_replay
tobias-wilfert Jan 30, 2026
fb33da6
add missing `?`
tobias-wilfert Jan 30, 2026
d6f60ca
Merge branch 'tobias-wilfert/feat/process_replay' of github.com:getse…
tobias-wilfert Jan 30, 2026
cf98063
readd old code and FF for partial rollout
tobias-wilfert Feb 2, 2026
5e09667
Merge branch 'master' into tobias-wilfert/feat/process_replay
tobias-wilfert Feb 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Use new processor architecture to process transactions. ([#5379](https://github.com/getsentry/relay/pull/5379))
- Add `gen_ai_response_time_to_first_token` as a `SpanData` attribute. ([#5575](https://github.com/getsentry/relay/pull/5575))
- Add sampling to expensive envelope buffer statsd metrics. ([#5576](https://github.com/getsentry/relay/pull/5576))
- Use new processor architecture to process replays. ([#5580](https://github.com/getsentry/relay/pull/5580))
- Add `gen_ai.cost_calculation.result` metric to track AI cost calculation outcomes by integration and platform. ([#5560](https://github.com/getsentry/relay/pull/5560))
- Normalizes and validates trace metric names. ([#5589](https://github.com/getsentry/relay/pull/5589))

Expand Down
3 changes: 3 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ pub enum Feature {
/// Enable the experimental Trace Attachment pipeline in Relay.
#[serde(rename = "projects:trace-attachment-processing")]
TraceAttachmentProcessing,
/// Enable the new Replay pipeline in Relay.
#[serde(rename = "organizations:new-replay-processing")]
NewReplayProcessing,
/// Forward compatibility.
#[doc(hidden)]
#[serde(other)]
Expand Down
36 changes: 23 additions & 13 deletions relay-event-normalization/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,27 @@ use crate::normalize::user_agent;
use crate::user_agent::RawUserAgentInfo;
use crate::{GeoIpLookup, trimming};

/// Replay validation or normalization error.
/// Replay validation error.
///
/// This error is returned from [`validate`] and [`normalize`].
/// This error is returned from [`validate`].
#[derive(Debug, thiserror::Error)]
pub enum ReplayError {
/// The replay event is missing a `replay_id`.
#[error("missing replay_id")]
MissingReplayId,
/// The replay event is missing a `segment_id`.
#[error("missing segment_id")]
MissingSegmentId,
/// The `segment_id` is to large to fit in a a u16.
#[error("segment_id too large")]
SegmentIdTooLarge,
/// One or more of the `error_ids` have an error.
#[error("invalid error_id specified")]
InvalidErrorId,
/// One or more of the `trace_ids` have an error.
#[error("invalid trace_id specified")]
InvalidTraceId,

/// The Replay event could not be parsed from JSON.
#[error("invalid json")]
CouldNotParse(#[from] serde_json::Error),
Expand Down Expand Up @@ -45,17 +61,15 @@ pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
replay
.replay_id
.value()
.ok_or_else(|| ReplayError::InvalidPayload("missing replay_id".to_owned()))?;
.ok_or(ReplayError::MissingReplayId)?;

let segment_id = *replay
.segment_id
.value()
.ok_or_else(|| ReplayError::InvalidPayload("missing segment_id".to_owned()))?;
.ok_or(ReplayError::MissingSegmentId)?;

if segment_id > u16::MAX as u64 {
return Err(ReplayError::InvalidPayload(
"segment_id exceeded u16 limit".to_owned(),
));
return Err(ReplayError::SegmentIdTooLarge);
}

if replay
Expand All @@ -65,9 +79,7 @@ pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
.flat_map(|v| v.iter())
.any(|v| v.meta().has_errors())
{
return Err(ReplayError::InvalidPayload(
"Invalid error-id specified.".to_owned(),
));
return Err(ReplayError::InvalidErrorId);
}

if replay
Expand All @@ -77,9 +89,7 @@ pub fn validate(replay: &Replay) -> Result<(), ReplayError> {
.flat_map(|v| v.iter())
.any(|v| v.meta().has_errors())
{
return Err(ReplayError::InvalidPayload(
"Invalid trace-id specified.".to_owned(),
));
return Err(ReplayError::InvalidTraceId);
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/processing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::processing::StoreHandle;
use crate::processing::check_ins::CheckInsProcessor;
use crate::processing::logs::LogsProcessor;
use crate::processing::profile_chunks::ProfileChunksProcessor;
use crate::processing::replays::ReplaysProcessor;
use crate::processing::sessions::SessionsProcessor;
use crate::processing::spans::SpansProcessor;
use crate::processing::trace_attachments::TraceAttachmentsProcessor;
Expand Down Expand Up @@ -66,4 +67,5 @@ outputs!(
Spans => SpansProcessor,
TraceAttachments => TraceAttachmentsProcessor,
TraceMetrics => TraceMetricsProcessor,
Replays => ReplaysProcessor,
);
1 change: 1 addition & 0 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use self::limits::*;
pub mod check_ins;
pub mod logs;
pub mod profile_chunks;
pub mod replays;
pub mod sessions;
pub mod spans;
pub mod trace_attachments;
Expand Down
65 changes: 65 additions & 0 deletions relay-server/src/processing/replays/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use relay_dynamic_config::Feature;
use relay_statsd::metric;

use crate::managed::{Managed, Rejected};
use crate::processing::Context;
use crate::processing::replays::{Error, ExpandedReplays, SerializedReplays};
use crate::statsd::RelayCounters;

/// Maximum expected segment ID for a replay session, under normal operation.
const MAX_SEGMENTS_ID: u64 = 720;

/// Filters replays sent for a project which does not allow replay ingestion.
pub fn feature_flag(
replays: Managed<SerializedReplays>,
ctx: Context<'_>,
) -> Result<Managed<SerializedReplays>, Rejected<Error>> {
match ctx.should_filter(Feature::SessionReplay)
|| (ctx
.project_info
.has_feature(Feature::SessionReplayVideoDisabled)
&& !replays.videos.is_empty())
{
true => Err(replays.reject_err(Error::FilterFeatureFlag)),
false => Ok(replays),
}
}

/// Applies inbound filters to individual replays.
pub fn filter(replays: &mut Managed<ExpandedReplays>, ctx: Context<'_>) {
let client_addr = replays.headers.meta().client_addr();
let event_id = replays.headers.event_id();

replays.retain(
|replays| &mut replays.replays,
|replay, _| {
let event = replay.get_event().value().ok_or(Error::NoEventContent)?;

relay_filter::should_filter(
event,
client_addr,
&ctx.project_info.config.filter_settings,
ctx.global_config.filters(),
)
.map_err(Error::Filtered)?;

// Log segments that exceed the hour limit so we can diagnose errant SDKs
// or exotic customer implementations.
if let Some(segment_id) = event.segment_id.value()
&& *segment_id > MAX_SEGMENTS_ID
{
metric!(counter(RelayCounters::ReplayExceededSegmentLimit) += 1);

relay_log::debug!(
event_id = ?event_id,
project_id = ctx.project_info.project_id.map(|v| v.value()),
organization_id = ctx.project_info.organization_id.map(|o| o.value()),
segment_id = segment_id,
"replay segment-exceeded-limit"
);
}

Ok::<_, Error>(())
},
)
}
112 changes: 112 additions & 0 deletions relay-server/src/processing/replays/forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use bytes::Bytes;
use smallvec::{SmallVec, smallvec};

use crate::Envelope;
use crate::envelope::{ContentType, Item, ItemType, Items};
#[cfg(feature = "processing")]
use crate::managed::ManagedEnvelope;
use crate::managed::{Managed, Rejected};
use crate::processing::replays::{
Error, ExpandedReplay, ExpandedReplays, ReplayVideoEvent, ReplaysOutput,
};
use crate::processing::{self, Forward};

/// Errors that can occur when serializing an expanded replay into envelope items.
#[derive(Debug, thiserror::Error)]
enum SerializeReplayError {
/// Failed to serialize the replay event to JSON.
#[error("json serialization failed")]
Json(#[from] serde_json::Error),
/// Failed to serialize the replay video event to MessagePack.
#[error("msgpack serialization failed")]
MsgPack(#[from] rmp_serde::encode::Error),
}

impl Forward for ReplaysOutput {
fn serialize_envelope(
self,
_ctx: processing::ForwardContext<'_>,
) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
Ok(self.0.map(|replays, records| {
let ExpandedReplays { headers, replays } = replays;
let mut items = Items::new();

for replay in replays {
match serialize_replay(&replay) {
Ok(replay_items) => items.extend(replay_items),
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
event_id = ?headers.event_id(),
"failed to serialize replay"
);
records.reject_err(Error::FailedToSerializeReplay, replay);
}
}
}

Envelope::from_parts(headers, items)
}))
}

#[cfg(feature = "processing")]
fn forward_store(
self,
s: processing::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();

s.store(crate::services::store::StoreEnvelope { envelope });

Ok(())
}
}

fn create_replay_event_item(payload: Bytes) -> Item {
let mut item = Item::new(ItemType::ReplayEvent);
item.set_payload(ContentType::Json, payload);
item
}

fn create_replay_recording_item(payload: Bytes) -> Item {
let mut item = Item::new(ItemType::ReplayRecording);
item.set_payload(ContentType::OctetStream, payload);
item
}

fn create_replay_video_item(payload: Bytes) -> Item {
let mut item = Item::new(ItemType::ReplayVideo);
item.set_payload(ContentType::OctetStream, payload);
item
}

fn serialize_replay(replay: &ExpandedReplay) -> Result<SmallVec<[Item; 2]>, SerializeReplayError> {
match replay {
ExpandedReplay::WebReplay { event, recording } => {
let event_bytes: Bytes = event.to_json()?.into_bytes().into();

Ok(smallvec![
create_replay_event_item(event_bytes),
create_replay_recording_item(recording.clone()),
])
}
ExpandedReplay::NativeReplay {
event,
recording,
video,
} => {
let event_bytes: Bytes = event.to_json()?.into_bytes().into();

let video_event = ReplayVideoEvent {
replay_event: event_bytes,
replay_recording: recording.clone(),
replay_video: video.clone(),
};

let payload = rmp_serde::to_vec_named(&video_event)?;
Ok(smallvec![create_replay_video_item(payload.into())])
}
}
}
Loading
Loading