Skip to content

Commit 17bd5d0

Browse files
authored
ref(relay): Always use new processing pipeline for spanv2 (#5244)
New containers can always go through the new processing pipeline.
1 parent 1da1eb4 commit 17bd5d0

File tree

4 files changed

+33
-458
lines changed

4 files changed

+33
-458
lines changed

relay-server/src/services/processor.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -313,20 +313,21 @@ impl ProcessingGroup {
313313
))
314314
}
315315

316-
if project_info.has_feature(Feature::SpanV2ExperimentalProcessing) {
317-
let span_v2_items = envelope.take_items_by(|item| {
318-
matches!(
319-
item.integration(),
320-
Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
321-
) || ItemContainer::<SpanV2>::is_container(item)
322-
});
316+
let span_v2_items = envelope.take_items_by(|item| {
317+
let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
318+
let is_supported_integration = matches!(
319+
item.integration(),
320+
Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
321+
);
323322

324-
if !span_v2_items.is_empty() {
325-
grouped_envelopes.push((
326-
ProcessingGroup::SpanV2,
327-
Envelope::from_parts(headers.clone(), span_v2_items),
328-
))
329-
}
323+
ItemContainer::<SpanV2>::is_container(item) || (exp_feature && is_supported_integration)
324+
});
325+
326+
if !span_v2_items.is_empty() {
327+
grouped_envelopes.push((
328+
ProcessingGroup::SpanV2,
329+
Envelope::from_parts(headers.clone(), span_v2_items),
330+
))
330331
}
331332

332333
// Extract spans.
@@ -2122,7 +2123,6 @@ impl EnvelopeProcessorService {
21222123
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
21232124
let mut extracted_metrics = ProcessingExtractedMetrics::new();
21242125

2125-
span::expand_v2_spans(managed_envelope)?;
21262126
span::filter(managed_envelope, ctx.config, ctx.project_info);
21272127
span::convert_otel_traces_data(managed_envelope);
21282128

relay-server/src/services/processor/span.rs

Lines changed: 2 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@
33
use prost::Message;
44
use relay_dynamic_config::Feature;
55
use relay_event_normalization::span::tag_extraction;
6-
use relay_event_schema::protocol::{Event, Span, SpanV2};
6+
use relay_event_schema::protocol::{Event, Span};
77
use relay_protocol::Annotated;
88
use relay_quotas::DataCategory;
99
use relay_spans::otel_trace::TracesData;
1010

11-
use crate::envelope::{ContentType, Item, ItemContainer, ItemType};
11+
use crate::envelope::{ContentType, Item, ItemType};
1212
use crate::integrations::{Integration, OtelFormat, SpansIntegration};
1313
use crate::managed::{ItemAction, TypedEnvelope};
1414
use crate::services::outcome::{DiscardReason, Outcome};
1515
use crate::services::processor::{SpanGroup, should_filter};
16-
use crate::statsd::RelayTimers;
1716

1817
#[cfg(feature = "processing")]
1918
mod processing;
@@ -22,8 +21,6 @@ use crate::services::projects::project::ProjectInfo;
2221
pub use processing::*;
2322
use relay_config::Config;
2423

25-
use super::ProcessingError;
26-
2724
pub fn filter(
2825
managed_envelope: &mut TypedEnvelope<SpanGroup>,
2926
config: &Config,
@@ -48,70 +45,6 @@ pub fn filter(
4845
});
4946
}
5047

51-
/// Expands V2 spans to V1 spans.
52-
///
53-
/// This expands one item (contanining multiple V2 spans) into several
54-
/// (containing one V1 span each).
55-
pub fn expand_v2_spans(
56-
managed_envelope: &mut TypedEnvelope<SpanGroup>,
57-
) -> Result<(), ProcessingError> {
58-
let span_v2_items = managed_envelope
59-
.envelope_mut()
60-
.take_items_by(ItemContainer::<SpanV2>::is_container);
61-
62-
// V2 spans must always be sent as an `ItemContainer`, currently it is not allowed to
63-
// send multiple containers for V2 spans.
64-
//
65-
// This restriction may be lifted in the future, this is why this validation only happens
66-
// when processing is enabled, allowing it to be changed easily in the future.
67-
//
68-
// This limit mostly exists to incentivise SDKs to batch multiple spans into a single container,
69-
// technically it can be removed without issues.
70-
if span_v2_items.len() > 1 {
71-
return Err(ProcessingError::DuplicateItem(ItemType::Span));
72-
}
73-
74-
if span_v2_items.is_empty() {
75-
return Ok(());
76-
}
77-
78-
let now = std::time::Instant::now();
79-
80-
for span_v2_item in span_v2_items {
81-
let spans_v2 = match ItemContainer::parse(&span_v2_item) {
82-
Ok(spans_v2) => spans_v2,
83-
Err(err) => {
84-
relay_log::debug!("failed to parse V2 spans: {err}");
85-
track_invalid(
86-
managed_envelope,
87-
DiscardReason::InvalidSpan,
88-
span_v2_item.item_count().unwrap_or(1) as usize,
89-
);
90-
continue;
91-
}
92-
};
93-
94-
for span_v2 in spans_v2.into_items() {
95-
let span_v1 = span_v2.value.map_value(relay_spans::span_v2_to_span_v1);
96-
match span_v1.to_json() {
97-
Ok(payload) => {
98-
let mut new_item = Item::new(ItemType::Span);
99-
new_item.set_payload(ContentType::Json, payload);
100-
managed_envelope.envelope_mut().add_item(new_item);
101-
}
102-
Err(err) => {
103-
relay_log::debug!("failed to serialize span: {}", err);
104-
track_invalid(managed_envelope, DiscardReason::Internal, 1);
105-
}
106-
}
107-
}
108-
}
109-
110-
relay_statsd::metric!(timer(RelayTimers::SpanV2Expansion) = now.elapsed());
111-
112-
Ok(())
113-
}
114-
11548
pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
11649
let envelope = managed_envelope.envelope_mut();
11750

relay-server/src/statsd.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -591,8 +591,6 @@ pub enum RelayTimers {
591591
BodyReadDuration,
592592
/// Timing in milliseconds to count spans in a serialized transaction payload.
593593
CheckNestedSpans,
594-
/// The time in milliseconds it takes to expand a Span V2 container into Spans V1.
595-
SpanV2Expansion,
596594
/// The time it needs to create a signature. Includes both the signature used for
597595
/// trusted relays and for register challenges.
598596
SignatureCreationDuration,
@@ -649,7 +647,6 @@ impl TimerMetric for RelayTimers {
649647
RelayTimers::BufferEnvelopeDecompression => "buffer.envelopes_decompression",
650648
RelayTimers::BodyReadDuration => "requests.body_read.duration",
651649
RelayTimers::CheckNestedSpans => "envelope.check_nested_spans",
652-
RelayTimers::SpanV2Expansion => "envelope.span_v2_expansion",
653650
RelayTimers::SignatureCreationDuration => "signature.create.duration",
654651
}
655652
}

0 commit comments

Comments
 (0)