Skip to content

Commit b2088b7

Browse files
authored
ref(spanv2): Disallow multiple span containers (#5247)
Already not allowed in the other span processing path and consistent with other items, like logs.
1 parent 89a07f2 commit b2088b7

File tree

5 files changed

+101
-6
lines changed

5 files changed

+101
-6
lines changed

relay-server/src/processing/logs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl processing::Processor for LogsProcessor {
137137
mut logs: Managed<Self::UnitOfWork>,
138138
ctx: Context<'_>,
139139
) -> Result<Output<Self::Output>, Rejected<Error>> {
140-
validate::container(&logs)?;
140+
validate::container(&logs).reject(&logs)?;
141141

142142
if ctx.is_proxy() {
143143
// If running in proxy mode, just apply cached rate limits and forward without

relay-server/src/processing/logs/validate.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use crate::processing::logs::{Error, SerializedLogs};
2-
use crate::processing::{Managed, Rejected};
1+
use crate::processing::Managed;
2+
use crate::processing::logs::{Error, Result, SerializedLogs};
33

44
/// Validates that there is only a single log container processed at a time.
55
///
@@ -10,10 +10,10 @@ use crate::processing::{Managed, Rejected};
1010
///
1111
/// This limit mostly exists to incentivise SDKs to batch multiple logs into a single container,
1212
/// technically it can be removed without issues.
13-
pub fn container(logs: &Managed<SerializedLogs>) -> Result<(), Rejected<Error>> {
13+
pub fn container(logs: &Managed<SerializedLogs>) -> Result<()> {
1414
// It's fine if there was no log container, as we still accept OTel logs.
1515
if logs.logs.len() > 1 {
16-
return Err(logs.reject_err(Error::DuplicateContainer));
16+
return Err(Error::DuplicateContainer);
1717
}
1818

1919
Ok(())

relay-server/src/processing/spans/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ mod integrations;
2222
mod process;
2323
#[cfg(feature = "processing")]
2424
mod store;
25+
mod validate;
2526

2627
type Result<T, E = Error> = std::result::Result<T, E>;
2728

2829
#[derive(Debug, thiserror::Error)]
2930
pub enum Error {
31+
/// Multiple item containers for spans in a single envelope are not allowed.
32+
#[error("duplicate span container")]
33+
DuplicateContainer,
3034
/// Standalone spans filtered because of a missing feature flag.
3135
#[error("spans feature flag missing")]
3236
FilterFeatureFlag,
@@ -49,6 +53,7 @@ impl OutcomeError for Error {
4953

5054
fn consume(self) -> (Option<Outcome>, Self::Error) {
5155
let outcome = match &self {
56+
Self::DuplicateContainer => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
5257
Self::FilterFeatureFlag => None,
5358
Self::Filtered(f) => Some(Outcome::Filtered(f.clone())),
5459
Self::RateLimited(limits) => {
@@ -97,7 +102,7 @@ impl processing::Processor for SpansProcessor {
97102

98103
let spans = envelope
99104
.envelope_mut()
100-
.take_items_by(|item| matches!(*item.ty(), ItemType::Span))
105+
.take_items_by(ItemContainer::<SpanV2>::is_container)
101106
.into_vec();
102107

103108
let integrations = envelope
@@ -119,6 +124,7 @@ impl processing::Processor for SpansProcessor {
119124
ctx: Context<'_>,
120125
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
121126
filter::feature_flag(ctx).reject(&spans)?;
127+
validate::container(&spans).reject(&spans)?;
122128

123129
if ctx.is_proxy() {
124130
// If running in proxy mode, just apply cached rate limits and forward without
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use crate::managed::Managed;
2+
use crate::processing::spans::{Error, Result, SerializedSpans};
3+
4+
/// Validates that there is only a single spans container processed at a time.
5+
///
6+
/// Currently it is only allowed to send a single span container in an envelope.
7+
/// Since all spans of the same envelope must belong to the same trace (due to the dynamic sampling
8+
/// context on the envelope), they also should be collapsed by SDKs into a single container.
9+
///
10+
/// Once we lift the requirement of a single trace per envelope, we may want to also consider
11+
/// lifting this restriction.
12+
pub fn container(spans: &Managed<SerializedSpans>) -> Result<()> {
13+
// It's fine if there was no container, as we still accept OTel spans.
14+
if spans.spans.len() > 1 {
15+
return Err(Error::DuplicateContainer);
16+
}
17+
18+
Ok(())
19+
}

tests/integration/test_spansv2.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,3 +532,73 @@ def test_spanv2_inbound_filters(
532532
]
533533

534534
assert mini_sentry.captured_events.empty()
535+
536+
537+
def test_spans_v2_multiple_containers_not_allowed(
538+
mini_sentry,
539+
relay,
540+
):
541+
project_id = 42
542+
project_config = mini_sentry.add_full_project_config(project_id)
543+
project_config["config"]["features"] = [
544+
"organizations:standalone-span-ingestion",
545+
"projects:span-v2-experimental-processing",
546+
]
547+
548+
relay = relay(mini_sentry, options=TEST_CONFIG)
549+
start = datetime.now(timezone.utc)
550+
envelope = Envelope()
551+
552+
payload = {
553+
"start_timestamp": start.timestamp(),
554+
"end_timestamp": start.timestamp() + 0.500,
555+
"trace_id": "5b8efff798038103d269b633813fc60c",
556+
"span_id": "eee19b7ec3c1b175",
557+
"name": "some op",
558+
"is_remote": False,
559+
"status": "ok",
560+
}
561+
envelope.add_item(
562+
Item(
563+
type="span",
564+
payload=PayloadRef(json={"items": [payload]}),
565+
content_type="application/vnd.sentry.items.span.v2+json",
566+
headers={"item_count": 1},
567+
)
568+
)
569+
envelope.add_item(
570+
Item(
571+
type="span",
572+
payload=PayloadRef(json={"items": [payload, payload]}),
573+
content_type="application/vnd.sentry.items.span.v2+json",
574+
headers={"item_count": 2},
575+
)
576+
)
577+
578+
relay.send_envelope(project_id, envelope)
579+
580+
assert mini_sentry.get_outcomes(2) == [
581+
{
582+
"category": DataCategory.SPAN.value,
583+
"timestamp": time_within_delta(),
584+
"key_id": 123,
585+
"org_id": 1,
586+
"outcome": 3, # Invalid
587+
"project_id": 42,
588+
"quantity": 3,
589+
"reason": "duplicate_item",
590+
},
591+
{
592+
"category": DataCategory.SPAN_INDEXED.value,
593+
"timestamp": time_within_delta(),
594+
"key_id": 123,
595+
"org_id": 1,
596+
"outcome": 3, # Invalid
597+
"project_id": 42,
598+
"quantity": 3,
599+
"reason": "duplicate_item",
600+
},
601+
]
602+
603+
assert mini_sentry.captured_events.empty()
604+
assert mini_sentry.captured_outcomes.empty()

0 commit comments

Comments
 (0)