Skip to content

Commit a782318

Browse files
authored
feat(spanv2): Process standalone spans with v2 pipeline (#5290)
There are still a few differences between the pipelines (see differences in the added integration tests). In follow-ups I will be step by step take care of them.
1 parent b97e972 commit a782318

File tree

4 files changed

+605
-9
lines changed

4 files changed

+605
-9
lines changed

relay-server/src/processing/spans/mod.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ impl processing::Processor for SpansProcessor {
120120
.take_items_by(ItemContainer::<SpanV2>::is_container)
121121
.into_vec();
122122

123+
let legacy = envelope
124+
.envelope_mut()
125+
.take_items_by(|item| matches!(item.ty(), ItemType::Span))
126+
.into_vec();
127+
123128
let integrations = envelope
124129
.envelope_mut()
125130
.take_items_by(|item| matches!(item.integration(), Some(Integration::Spans(_))))
@@ -128,6 +133,7 @@ impl processing::Processor for SpansProcessor {
128133
let work = SerializedSpans {
129134
headers,
130135
spans,
136+
legacy,
131137
integrations,
132138
};
133139
Some(Managed::from_envelope(envelope, work))
@@ -215,11 +221,12 @@ pub struct SerializedSpans {
215221
/// Original envelope headers.
216222
headers: EnvelopeHeaders,
217223

218-
/// A list of spans waiting to be processed.
219-
///
220-
/// All items contained here must be spans.
224+
/// A list of span 'v2' item containers.
221225
spans: Vec<Item>,
222226

227+
/// A list of legacy span 'v1' items.
228+
legacy: Vec<Item>,
229+
223230
/// Spans which Relay received from arbitrary integrations.
224231
integrations: Vec<Item>,
225232
}
@@ -235,7 +242,10 @@ impl SerializedSpans {
235242

236243
impl Counted for SerializedSpans {
237244
fn quantities(&self) -> Quantities {
238-
let quantity = (outcome_count(&self.spans) + outcome_count(&self.integrations)) as usize;
245+
let quantity = (outcome_count(&self.spans)
246+
+ outcome_count(&self.legacy)
247+
+ outcome_count(&self.integrations)) as usize;
248+
239249
smallvec::smallvec![
240250
(DataCategory::Span, quantity),
241251
(DataCategory::SpanIndexed, quantity),

relay-server/src/processing/spans/process.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ use relay_event_normalization::{
22
GeoIpLookup, RequiredMode, SchemaProcessor, TimestampProcessor, TrimmingProcessor, eap,
33
};
44
use relay_event_schema::processor::{ProcessingState, ValueType, process_value};
5-
use relay_event_schema::protocol::SpanV2;
5+
use relay_event_schema::protocol::{Span, SpanV2};
66
use relay_protocol::Annotated;
77

8-
use crate::envelope::{ContainerItems, Item, ItemContainer};
8+
use crate::envelope::{ContainerItems, Item, ItemContainer, WithHeader};
99
use crate::extractors::RequestMeta;
1010
use crate::managed::Managed;
1111
use crate::processing::Context;
@@ -20,11 +20,18 @@ pub fn expand(spans: Managed<SampledSpans>) -> Managed<ExpandedSpans> {
2020
let mut all_spans = Vec::new();
2121

2222
for item in &spans.inner.spans {
23-
let expanded = expand_span(item);
23+
let expanded = expand_span_container(item);
2424
let expanded = records.or_default(expanded, item);
2525
all_spans.extend(expanded);
2626
}
2727

28+
for item in &spans.inner.legacy {
29+
match expand_legacy_span(item) {
30+
Ok(span) => all_spans.push(span),
31+
Err(err) => drop(records.reject_err(err, item)),
32+
}
33+
}
34+
2835
spans::integrations::expand_into(&mut all_spans, records, spans.inner.integrations);
2936

3037
ExpandedSpans {
@@ -35,7 +42,7 @@ pub fn expand(spans: Managed<SampledSpans>) -> Managed<ExpandedSpans> {
3542
})
3643
}
3744

38-
fn expand_span(item: &Item) -> Result<ContainerItems<SpanV2>> {
45+
fn expand_span_container(item: &Item) -> Result<ContainerItems<SpanV2>> {
3946
let spans = ItemContainer::parse(item)
4047
.map_err(|err| {
4148
relay_log::debug!("failed to parse span container: {err}");
@@ -46,6 +53,19 @@ fn expand_span(item: &Item) -> Result<ContainerItems<SpanV2>> {
4653
Ok(spans)
4754
}
4855

56+
fn expand_legacy_span(item: &Item) -> Result<WithHeader<SpanV2>> {
57+
let payload = item.payload();
58+
59+
let span = Annotated::<Span>::from_json_bytes(&payload)
60+
.map_err(|err| {
61+
relay_log::debug!("failed to parse span: {err}");
62+
Error::Invalid(DiscardReason::InvalidJson)
63+
})?
64+
.map_value(relay_spans::span_v1_to_span_v2);
65+
66+
Ok(WithHeader::new(span))
67+
}
68+
4969
/// Normalizes individual spans.
5070
pub fn normalize(spans: &mut Managed<ExpandedSpans>, geo_lookup: &GeoIpLookup) {
5171
spans.retain_with_context(

relay-server/src/services/processor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,11 @@ impl ProcessingGroup {
319319
item.integration(),
320320
Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
321321
);
322+
let is_span = matches!(item.ty(), &ItemType::Span);
322323

323-
ItemContainer::<SpanV2>::is_container(item) || (exp_feature && is_supported_integration)
324+
ItemContainer::<SpanV2>::is_container(item)
325+
|| (exp_feature && is_span)
326+
|| (exp_feature && is_supported_integration)
324327
});
325328

326329
if !span_v2_items.is_empty() {

0 commit comments

Comments
 (0)