Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

- Fix parsing of data categories/quotas when using an aliased data category name. ([#5435](https://github.com/getsentry/relay/pull/5435))

**Features**:

- Add functionality to process and store span attachments. ([#5423](https://github.com/getsentry/relay/pull/5423), [#5363](https://github.com/getsentry/relay/pull/5363))

**Internal**:

- Revise trace metric and log size limits. ([#5440](https://github.com/getsentry/relay/pull/5440))
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5090,9 +5090,9 @@ dependencies = [

[[package]]
name = "sentry_protos"
version = "0.4.2"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef5b7751756e49938d79bdb5e0e881f71ffca7bf207aa7866f837d8a965753d"
checksum = "cf33a607845b7433ca41476e79004c1e2eaebf682f4c1d8e2145a288914ce8d8"
dependencies = [
"prost",
"prost-types",
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ sentry-release-parser = { version = "1.4.0", default-features = false, features
"semver-1",
] }
sentry-types = "0.41.0"
sentry_protos = "0.4.2"
sentry_protos = "0.4.8"
serde = { version = "=1.0.228", features = ["derive", "rc"] }
serde-transcode = "1.1.1"
serde-vars = "0.2.0"
Expand Down
6 changes: 5 additions & 1 deletion relay-event-schema/src/protocol/attachment_v2.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, Value};

use crate::processor::ProcessValue;
use crate::protocol::{Attributes, Timestamp};
use crate::protocol::{Attributes, Timestamp, TraceId};

use uuid::Uuid;

/// Metadata for a span attachment.
#[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)]
pub struct AttachmentV2Meta {
/// The ID of the trace that the attachment belongs to.
#[metastructure(required = true, nonempty = true, trim = false)]
pub trace_id: Annotated<TraceId>,

/// Unique identifier for this attachment.
#[metastructure(required = true, nonempty = true, trim = false)]
pub attachment_id: Annotated<Uuid>,
Expand Down
9 changes: 9 additions & 0 deletions relay-server/src/envelope/item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,15 @@ pub enum ParentId {
SpanId(Option<SpanId>),
}

impl ParentId {
/// Converts the ID to a span ID (if applicable).
pub fn as_span_id(&self) -> Option<SpanId> {
match self {
ParentId::SpanId(span_id) => *span_id,
}
}
}

#[cfg(test)]
mod tests {
use crate::integrations::OtelFormat;
Expand Down
14 changes: 14 additions & 0 deletions relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::BTreeMap;

use itertools::Either;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either seems to get imported from a couple of different crates, not sure if this is the canonical one.

use relay_event_schema::protocol::{
OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2, TraceMetric,
};
Expand Down Expand Up @@ -40,6 +41,19 @@ impl<T: Counted> Counted for Option<T> {
}
}

impl<L, R> Counted for Either<L, R>
where
L: Counted,
R: Counted,
{
fn quantities(&self) -> Quantities {
match self {
Either::Left(value) => value.quantities(),
Either::Right(value) => value.quantities(),
}
}
}

impl Counted for Item {
fn quantities(&self) -> Quantities {
self.quantities()
Expand Down
12 changes: 12 additions & 0 deletions relay-server/src/managed/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use chrono::{DateTime, Utc};
use itertools::Either;
use relay_event_schema::protocol::EventId;
use relay_quotas::{DataCategory, Scoping};
use relay_system::Addr;
Expand Down Expand Up @@ -593,6 +594,17 @@ impl<T: Counted> Managed<Option<T>> {
}
}

impl<L: Counted, R: Counted> Managed<Either<L, R>> {
/// Turns a managed [`Either`] into an [`Either`] of [`Managed`].
pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
let (either, meta) = self.destructure();
match either {
Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
}
}
}

impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
fn from(value: Managed<Box<Envelope>>) -> Self {
let (value, meta) = value.destructure();
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ impl<'a> StoreHandle<'a> {
}

/// Sends a message to the [`Upload`] service.
#[expect(unused)]
pub fn upload<M>(&self, message: M)
where
Upload: FromMessage<M>,
Expand Down
14 changes: 3 additions & 11 deletions relay-server/src/processing/logs/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use prost_types::Timestamp;
use relay_event_schema::protocol::{Attributes, OurLog, OurLogLevel, SpanId};
use relay_protocol::{Annotated, IntoValue, Value};
use relay_quotas::Scoping;
Expand All @@ -10,7 +9,7 @@ use uuid::Uuid;

use crate::envelope::WithHeader;
use crate::processing::logs::{Error, Result};
use crate::processing::utils::store::{AttributeMeta, extract_meta_attributes};
use crate::processing::utils::store::{AttributeMeta, extract_meta_attributes, proto_timestamp};
use crate::processing::{Counted, Retention};
use crate::services::outcome::DiscardReason;
use crate::services::store::StoreTraceItem;
Expand Down Expand Up @@ -66,10 +65,10 @@ pub fn convert(log: WithHeader<OurLog>, ctx: &Context) -> Result<StoreTraceItem>
item_type: TraceItemType::Log.into(),
organization_id: ctx.scoping.organization_id.value(),
project_id: ctx.scoping.project_id.value(),
received: Some(ts(ctx.received_at)),
received: Some(proto_timestamp(ctx.received_at)),
retention_days: ctx.retention.standard.into(),
downsampled_retention_days: ctx.retention.downsampled.into(),
timestamp: Some(ts(timestamp.0)),
timestamp: Some(proto_timestamp(timestamp.0)),
trace_id: required!(log.trace_id).to_string(),
item_id: Uuid::new_v7(timestamp.into()).as_bytes().to_vec(),
attributes: attributes(meta, attrs, fields),
Expand All @@ -83,13 +82,6 @@ pub fn convert(log: WithHeader<OurLog>, ctx: &Context) -> Result<StoreTraceItem>
})
}

fn ts(dt: DateTime<Utc>) -> Timestamp {
Timestamp {
seconds: dt.timestamp(),
nanos: i32::try_from(dt.timestamp_subsec_nanos()).unwrap_or(0),
}
}

/// Fields on the log message which are stored as fields.
struct FieldAttributes {
/// The log level.
Expand Down
81 changes: 49 additions & 32 deletions relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use either::Either;
use relay_event_normalization::GeoIpLookup;
use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::{AttachmentV2Meta, SpanId, SpanV2};
use relay_event_schema::protocol::{AttachmentV2Meta, SpanV2};
use relay_pii::PiiConfigError;
use relay_protocol::Annotated;
use relay_quotas::{DataCategory, RateLimits};
Expand Down Expand Up @@ -242,21 +242,24 @@ impl Forward for SpanOutput {
retention: ctx.retention(|r| r.span.as_ref()),
};

// Explicitly drop standalone attachments before splitting
// They are not stored for now.
// This must be fixed before enabling the feature flag.
let spans = spans.map(|mut inner, record_keeper| {
if !inner.stand_alone_attachments.is_empty() {
let standalone = std::mem::take(&mut inner.stand_alone_attachments);
record_keeper.reject_err(Outcome::Invalid(DiscardReason::Internal), standalone);
let spans_and_attachments = spans.split(|spans| spans.into_parts());
for either in spans_and_attachments {
match either.transpose() {
Either::Left(span) => {
if let Ok(span) = span.try_map(|span, _| store::convert(span, &ctx)) {
s.store(span);
}
}
Either::Right(attachment) => {
if let Ok(attachment) = store::attachment::convert(
attachment,
ctx.retention,
ctx.server_sample_rate,
) {
s.upload(attachment);
}
}
}
inner
});

for span in spans.split(|spans| spans.into_indexed_spans()) {
if let Ok(span) = span.try_map(|span, _| store::convert(span, &ctx)) {
s.store(span)
};
}

Ok(())
Expand Down Expand Up @@ -359,8 +362,7 @@ impl<C> ExpandedSpans<C> {

for ExpandedSpan { span, attachments } in self.spans {
for attachment in attachments {
let span_id = span.value().and_then(|s| s.span_id.value().copied());
items.push(attachment_to_item(attachment, span_id)?);
items.push(attachment_to_item(attachment)?);
}

spans_without_attachments.push(span);
Expand All @@ -373,7 +375,7 @@ impl<C> ExpandedSpans<C> {
}

for attachment in self.stand_alone_attachments {
items.push(attachment_to_item(attachment, None)?);
items.push(attachment_to_item(attachment)?);
}

Ok(Envelope::from_parts(self.headers, Items::from_vec(items)))
Expand Down Expand Up @@ -403,11 +405,12 @@ impl<C> ExpandedSpans<C> {
}
}

fn attachment_to_item(
attachment: ExpandedAttachment,
span_id: Option<SpanId>,
) -> Result<Item, ContainerWriteError> {
let ExpandedAttachment { meta, body } = attachment;
fn attachment_to_item(attachment: ExpandedAttachment) -> Result<Item, ContainerWriteError> {
let ExpandedAttachment {
parent_id,
meta,
body,
} = attachment;

let meta_json = meta.to_json()?;
let meta_bytes = meta_json.into_bytes();
Expand All @@ -420,7 +423,7 @@ fn attachment_to_item(
let mut item = Item::new(ItemType::Attachment);
item.set_payload(ContentType::AttachmentV2, payload.freeze());
item.set_meta_length(meta_length as u32);
item.set_parent_id(ParentId::SpanId(span_id));
item.set_parent_id(parent_id);

Ok(item)
}
Expand Down Expand Up @@ -450,8 +453,22 @@ impl ExpandedSpans<TotalAndIndexed> {

impl ExpandedSpans<Indexed> {
#[cfg(feature = "processing")]
fn into_indexed_spans(self) -> impl Iterator<Item = IndexedSpan> {
self.spans.into_iter().map(IndexedSpan)
fn into_parts(self) -> impl Iterator<Item = Either<IndexedSpanOnly, ExpandedAttachment>> {
let Self {
headers: _,
server_sample_rate: _,
spans,
stand_alone_attachments,
category: _,
} = self;
spans
.into_iter()
.flat_map(|span| {
let ExpandedSpan { span, attachments } = span;
std::iter::once(Either::Left(IndexedSpanOnly(span)))
.chain(attachments.into_iter().map(Either::Right))
})
.chain(stand_alone_attachments.into_iter().map(Either::Right))
}
}

Expand Down Expand Up @@ -610,23 +627,23 @@ impl Managed<ExpandedSpans<TotalAndIndexed>> {
}
}

/// A Span which only represents the indexed category.
#[cfg(feature = "processing")]
#[derive(Debug)]
struct IndexedSpan(ExpandedSpan);
struct IndexedSpanOnly(WithHeader<SpanV2>);

#[cfg(feature = "processing")]
impl Counted for IndexedSpan {
impl Counted for IndexedSpanOnly {
fn quantities(&self) -> Quantities {
let mut quantities = smallvec::smallvec![(DataCategory::SpanIndexed, 1)];
quantities.extend(self.0.attachments.quantities());
quantities
smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
}
}

/// A validated and parsed span attachment.
#[derive(Debug)]
pub struct ExpandedAttachment {
/// The ID of the log / span / metric that owns the span.
pub parent_id: ParentId,

/// The parsed metadata from the attachment.
pub meta: Annotated<AttachmentV2Meta>,

Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/spans/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ fn parse_and_validate_span_attachment(item: &Item) -> Result<(Option<SpanId>, Ex
Ok((
associated_span_id,
ExpandedAttachment {
parent_id: ParentId::SpanId(associated_span_id),
meta,
body: Bytes::copy_from_slice(body),
},
Expand Down
25 changes: 15 additions & 10 deletions relay-server/src/processing/spans/store.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::ops::Deref;

use relay_event_schema::protocol::SpanV2;
use relay_protocol::{Annotated, FiniteF64};
use relay_event_schema::protocol::Attributes;
use relay_protocol::Annotated;
use relay_protocol::FiniteF64;

use crate::processing::Retention;
use crate::processing::spans::{Error, IndexedSpan, Result};
use crate::processing::spans::{Error, IndexedSpanOnly, Result};
use crate::services::outcome::DiscardReason;
use crate::services::store::StoreSpanV2;

pub mod attachment;

macro_rules! required {
($value:expr) => {{
match $value {
Expand All @@ -31,14 +34,13 @@ pub struct Context {
pub retention: Retention,
}

/// Converts a processed [`SpanV2`] into a [Kafka](crate::services::store::Store) compatible format.
pub fn convert(span: IndexedSpan, ctx: &Context) -> Result<Box<StoreSpanV2>> {
// TODO: We are not doing anything with the attachment here.
let mut span = required!(span.0.span.value);
/// Converts a processed [`SpanV2`](super::SpanV2) into a [Kafka](crate::services::store::Store) compatible format.
pub fn convert(span: IndexedSpanOnly, ctx: &Context) -> Result<Box<StoreSpanV2>> {
let mut span = required!(span.0.value);

let routing_key = span.trace_id.value().map(|v| *v.deref());

inject_server_sample_rate(&mut span, ctx.server_sample_rate);
inject_server_sample_rate(&mut span.attributes, ctx.server_sample_rate);

Ok(Box::new(StoreSpanV2 {
routing_key,
Expand All @@ -49,11 +51,14 @@ pub fn convert(span: IndexedSpan, ctx: &Context) -> Result<Box<StoreSpanV2>> {
}

/// Injects a server sample rate into a span.
fn inject_server_sample_rate(span: &mut SpanV2, server_sample_rate: Option<f64>) {
fn inject_server_sample_rate(
attributes: &mut Annotated<Attributes>,
server_sample_rate: Option<f64>,
) {
let Some(server_sample_rate) = server_sample_rate.and_then(FiniteF64::new) else {
return;
};

let attributes = span.attributes.get_or_insert_with(Default::default);
let attributes = attributes.get_or_insert_with(Default::default);
attributes.insert("sentry.server_sample_rate", server_sample_rate.to_f64());
}
Loading
Loading