Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Fix parsing of data categories/quotas when using an aliased data category name. ([#5435](https://github.com/getsentry/relay/pull/5435))

**Features**:

- Add functionality to process and store span attachments. ([#5423](https://github.com/getsentry/relay/pull/5423), [#5363](https://github.com/getsentry/relay/pull/5363))

## 25.11.1

**Breaking Changes**:
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5088,9 +5088,9 @@ dependencies = [

[[package]]
name = "sentry_protos"
version = "0.4.2"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef5b7751756e49938d79bdb5e0e881f71ffca7bf207aa7866f837d8a965753d"
checksum = "cf33a607845b7433ca41476e79004c1e2eaebf682f4c1d8e2145a288914ce8d8"
dependencies = [
"prost",
"prost-types",
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ sentry-release-parser = { version = "1.4.0", default-features = false, features
"semver-1",
] }
sentry-types = "0.41.0"
sentry_protos = "0.4.2"
sentry_protos = "0.4.8"
serde = { version = "=1.0.228", features = ["derive", "rc"] }
serde-transcode = "1.1.1"
serde-vars = "0.2.0"
Expand Down
6 changes: 5 additions & 1 deletion relay-event-schema/src/protocol/attachment_v2.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, Value};

use crate::processor::ProcessValue;
use crate::protocol::{Attributes, Timestamp};
use crate::protocol::{Attributes, Timestamp, TraceId};

use uuid::Uuid;

/// Metadata for a span attachment.
#[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)]
pub struct AttachmentV2Meta {
/// The ID of the trace that the attachment belongs to.
#[metastructure(required = true, nonempty = true, trim = false)]
pub trace_id: Annotated<TraceId>,

/// Unique identifier for this attachment.
#[metastructure(required = true, nonempty = true, trim = false)]
pub attachment_id: Annotated<Uuid>,
Expand Down
9 changes: 9 additions & 0 deletions relay-server/src/envelope/item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,15 @@ pub enum ParentId {
SpanId(Option<SpanId>),
}

impl ParentId {
/// Converts the ID to a span ID (if applicable).
pub fn as_span_id(&self) -> Option<SpanId> {
match self {
ParentId::SpanId(span_id) => *span_id,
}
}
}

#[cfg(test)]
mod tests {
use crate::integrations::OtelFormat;
Expand Down
14 changes: 14 additions & 0 deletions relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::BTreeMap;

use itertools::Either;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either seems to get imported from a couple of different crates, not sure if this is the canonical one.

use relay_event_schema::protocol::{
OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2, TraceMetric,
};
Expand Down Expand Up @@ -40,6 +41,19 @@ impl<T: Counted> Counted for Option<T> {
}
}

impl<L, R> Counted for Either<L, R>
where
L: Counted,
R: Counted,
{
fn quantities(&self) -> Quantities {
match self {
Either::Left(value) => value.quantities(),
Either::Right(value) => value.quantities(),
}
}
}

impl Counted for Item {
fn quantities(&self) -> Quantities {
self.quantities()
Expand Down
12 changes: 12 additions & 0 deletions relay-server/src/managed/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use chrono::{DateTime, Utc};
use itertools::Either;
use relay_event_schema::protocol::EventId;
use relay_quotas::{DataCategory, Scoping};
use relay_system::Addr;
Expand Down Expand Up @@ -593,6 +594,17 @@ impl<T: Counted> Managed<Option<T>> {
}
}

impl<L: Counted, R: Counted> Managed<Either<L, R>> {
/// Turns a managed option into an optional [`Managed`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment still refers to Option.

pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
let (either, meta) = self.destructure();
match either {
Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
}
}
}

impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
fn from(value: Managed<Box<Envelope>>) -> Self {
let (value, meta) = value.destructure();
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ impl<'a> StoreHandle<'a> {
}

/// Sends a message to the [`Upload`] service.
#[expect(unused)]
pub fn upload<M>(&self, message: M)
where
Upload: FromMessage<M>,
Expand Down
14 changes: 3 additions & 11 deletions relay-server/src/processing/logs/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use prost_types::Timestamp;
use relay_event_schema::protocol::{Attributes, OurLog, OurLogLevel, SpanId};
use relay_protocol::{Annotated, IntoValue, Value};
use relay_quotas::Scoping;
Expand All @@ -10,7 +9,7 @@ use uuid::Uuid;

use crate::envelope::WithHeader;
use crate::processing::logs::{Error, Result};
use crate::processing::utils::store::{AttributeMeta, extract_meta_attributes};
use crate::processing::utils::store::{AttributeMeta, extract_meta_attributes, proto_timestamp};
use crate::processing::{Counted, Retention};
use crate::services::outcome::DiscardReason;
use crate::services::store::StoreTraceItem;
Expand Down Expand Up @@ -66,10 +65,10 @@ pub fn convert(log: WithHeader<OurLog>, ctx: &Context) -> Result<StoreTraceItem>
item_type: TraceItemType::Log.into(),
organization_id: ctx.scoping.organization_id.value(),
project_id: ctx.scoping.project_id.value(),
received: Some(ts(ctx.received_at)),
received: Some(proto_timestamp(ctx.received_at)),
retention_days: ctx.retention.standard.into(),
downsampled_retention_days: ctx.retention.downsampled.into(),
timestamp: Some(ts(timestamp.0)),
timestamp: Some(proto_timestamp(timestamp.0)),
trace_id: required!(log.trace_id).to_string(),
item_id: Uuid::new_v7(timestamp.into()).as_bytes().to_vec(),
attributes: attributes(meta, attrs, fields),
Expand All @@ -83,13 +82,6 @@ pub fn convert(log: WithHeader<OurLog>, ctx: &Context) -> Result<StoreTraceItem>
})
}

fn ts(dt: DateTime<Utc>) -> Timestamp {
Timestamp {
seconds: dt.timestamp(),
nanos: i32::try_from(dt.timestamp_subsec_nanos()).unwrap_or(0),
}
}

/// Fields on the log message which are stored as fields.
struct FieldAttributes {
/// The log level.
Expand Down
81 changes: 49 additions & 32 deletions relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use either::Either;
use relay_event_normalization::GeoIpLookup;
use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::{AttachmentV2Meta, SpanId, SpanV2};
use relay_event_schema::protocol::{AttachmentV2Meta, SpanV2};
use relay_pii::PiiConfigError;
use relay_protocol::Annotated;
use relay_quotas::{DataCategory, RateLimits};
Expand Down Expand Up @@ -242,21 +242,24 @@ impl Forward for SpanOutput {
retention: ctx.retention(|r| r.span.as_ref()),
};

// Explicitly drop standalone attachments before splitting
// They are not stored for now.
// This must be fixed before enabling the feature flag.
let spans = spans.map(|mut inner, record_keeper| {
if !inner.stand_alone_attachments.is_empty() {
let standalone = std::mem::take(&mut inner.stand_alone_attachments);
record_keeper.reject_err(Outcome::Invalid(DiscardReason::Internal), standalone);
let spans_and_attachments = spans.split(|spans| spans.into_parts());
for either in spans_and_attachments {
match either.transpose() {
Either::Left(span) => {
if let Ok(span) = span.try_map(|span, _| store::convert(span, &ctx)) {
s.store(span);
}
}
Either::Right(attachment) => {
if let Ok(attachment) = store::attachment::convert(
attachment,
ctx.retention,
ctx.server_sample_rate,
) {
s.upload(attachment);
}
}
}
inner
});

for span in spans.split(|spans| spans.into_indexed_spans()) {
if let Ok(span) = span.try_map(|span, _| store::convert(span, &ctx)) {
s.store(span)
};
}

Ok(())
Expand Down Expand Up @@ -359,8 +362,7 @@ impl<C> ExpandedSpans<C> {

for ExpandedSpan { span, attachments } in self.spans {
for attachment in attachments {
let span_id = span.value().and_then(|s| s.span_id.value().copied());
items.push(attachment_to_item(attachment, span_id)?);
items.push(attachment_to_item(attachment)?);
}

spans_without_attachments.push(span);
Expand All @@ -373,7 +375,7 @@ impl<C> ExpandedSpans<C> {
}

for attachment in self.stand_alone_attachments {
items.push(attachment_to_item(attachment, None)?);
items.push(attachment_to_item(attachment)?);
}

Ok(Envelope::from_parts(self.headers, Items::from_vec(items)))
Expand Down Expand Up @@ -403,11 +405,12 @@ impl<C> ExpandedSpans<C> {
}
}

fn attachment_to_item(
attachment: ExpandedAttachment,
span_id: Option<SpanId>,
) -> Result<Item, ContainerWriteError> {
let ExpandedAttachment { meta, body } = attachment;
fn attachment_to_item(attachment: ExpandedAttachment) -> Result<Item, ContainerWriteError> {
let ExpandedAttachment {
parent_id,
meta,
body,
} = attachment;

let meta_json = meta.to_json()?;
let meta_bytes = meta_json.into_bytes();
Expand All @@ -420,7 +423,7 @@ fn attachment_to_item(
let mut item = Item::new(ItemType::Attachment);
item.set_payload(ContentType::AttachmentV2, payload.freeze());
item.set_meta_length(meta_length as u32);
item.set_parent_id(ParentId::SpanId(span_id));
item.set_parent_id(parent_id);

Ok(item)
}
Expand Down Expand Up @@ -450,8 +453,22 @@ impl ExpandedSpans<TotalAndIndexed> {

impl ExpandedSpans<Indexed> {
#[cfg(feature = "processing")]
fn into_indexed_spans(self) -> impl Iterator<Item = IndexedSpan> {
self.spans.into_iter().map(IndexedSpan)
fn into_parts(self) -> impl Iterator<Item = Either<IndexedSpanOnly, ExpandedAttachment>> {
let Self {
headers: _,
server_sample_rate: _,
spans,
stand_alone_attachments,
category: _,
} = self;
spans
.into_iter()
.flat_map(|span| {
let ExpandedSpan { span, attachments } = span;
std::iter::once(Either::Left(IndexedSpanOnly(span)))
.chain(attachments.into_iter().map(Either::Right))
})
.chain(stand_alone_attachments.into_iter().map(Either::Right))
}
}

Expand Down Expand Up @@ -610,23 +627,23 @@ impl Managed<ExpandedSpans<TotalAndIndexed>> {
}
}

/// A Span which only represents the indexed category.
#[cfg(feature = "processing")]
#[derive(Debug)]
struct IndexedSpan(ExpandedSpan);
struct IndexedSpanOnly(WithHeader<SpanV2>);

#[cfg(feature = "processing")]
impl Counted for IndexedSpan {
impl Counted for IndexedSpanOnly {
fn quantities(&self) -> Quantities {
let mut quantities = smallvec::smallvec![(DataCategory::SpanIndexed, 1)];
quantities.extend(self.0.attachments.quantities());
quantities
smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
}
}

/// A validated and parsed span attachment.
#[derive(Debug)]
pub struct ExpandedAttachment {
/// The ID of the log / span / metric that owns the span.
pub parent_id: ParentId,

/// The parsed metadata from the attachment.
pub meta: Annotated<AttachmentV2Meta>,

Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/spans/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ fn parse_and_validate_span_attachment(item: &Item) -> Result<(Option<SpanId>, Ex
Ok((
associated_span_id,
ExpandedAttachment {
parent_id: ParentId::SpanId(associated_span_id),
meta,
body: Bytes::copy_from_slice(body),
},
Expand Down
25 changes: 15 additions & 10 deletions relay-server/src/processing/spans/store.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::ops::Deref;

use relay_event_schema::protocol::SpanV2;
use relay_protocol::{Annotated, FiniteF64};
use relay_event_schema::protocol::Attributes;
use relay_protocol::Annotated;
use relay_protocol::FiniteF64;

use crate::processing::Retention;
use crate::processing::spans::{Error, IndexedSpan, Result};
use crate::processing::spans::{Error, IndexedSpanOnly, Result};
use crate::services::outcome::DiscardReason;
use crate::services::store::StoreSpanV2;

pub mod attachment;

macro_rules! required {
($value:expr) => {{
match $value {
Expand All @@ -31,14 +34,13 @@ pub struct Context {
pub retention: Retention,
}

/// Converts a processed [`SpanV2`] into a [Kafka](crate::services::store::Store) compatible format.
pub fn convert(span: IndexedSpan, ctx: &Context) -> Result<Box<StoreSpanV2>> {
// TODO: We are not doing anything with the attachment here.
let mut span = required!(span.0.span.value);
/// Converts a processed [`SpanV2`](super::SpanV2) into a [Kafka](crate::services::store::Store) compatible format.
pub fn convert(span: IndexedSpanOnly, ctx: &Context) -> Result<Box<StoreSpanV2>> {
let mut span = required!(span.0.value);

let routing_key = span.trace_id.value().map(|v| *v.deref());

inject_server_sample_rate(&mut span, ctx.server_sample_rate);
inject_server_sample_rate(&mut span.attributes, ctx.server_sample_rate);

Ok(Box::new(StoreSpanV2 {
routing_key,
Expand All @@ -49,11 +51,14 @@ pub fn convert(span: IndexedSpan, ctx: &Context) -> Result<Box<StoreSpanV2>> {
}

/// Injects a server sample rate into a span.
fn inject_server_sample_rate(span: &mut SpanV2, server_sample_rate: Option<f64>) {
fn inject_server_sample_rate(
attributes: &mut Annotated<Attributes>,
server_sample_rate: Option<f64>,
) {
let Some(server_sample_rate) = server_sample_rate.and_then(FiniteF64::new) else {
return;
};

let attributes = span.attributes.get_or_insert_with(Default::default);
let attributes = attributes.get_or_insert_with(Default::default);
attributes.insert("sentry.server_sample_rate", server_sample_rate.to_f64());
}
Loading
Loading