From 02e2506d3019e176ca910c508e4000d2a970164f Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 3 Dec 2025 12:43:44 +0100 Subject: [PATCH 1/5] rebase --- CHANGELOG.md | 4 + Cargo.lock | 4 +- Cargo.toml | 2 +- .../src/protocol/attachment_v2.rs | 6 +- relay-server/src/envelope/item.rs | 9 + relay-server/src/managed/counted.rs | 14 ++ relay-server/src/managed/managed.rs | 12 ++ relay-server/src/processing/forward.rs | 1 - relay-server/src/processing/logs/store.rs | 14 +- relay-server/src/processing/spans/mod.rs | 81 +++++--- relay-server/src/processing/spans/process.rs | 1 + relay-server/src/processing/spans/store.rs | 25 ++- .../src/processing/trace_metrics/store.rs | 16 +- relay-server/src/processing/utils/store.rs | 19 ++ relay-server/src/services/upload.rs | 193 +++++++++++++++--- tests/integration/conftest.py | 1 + tests/integration/fixtures/processing.py | 13 +- tests/integration/test_attachments.py | 13 +- tests/integration/test_attachmentsv2.py | 159 ++++++++++++++- tests/integration/test_minidump.py | 8 +- 20 files changed, 483 insertions(+), 112 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 79ae34ce006..b7e3c1c2bda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ - Fix parsing of data categories/quotas when using an aliased data category name. ([#5435](https://github.com/getsentry/relay/pull/5435)) +**Features**: + +- Add functionality to process and store span attachments. ([#5423](https://github.com/getsentry/relay/pull/5423), [#5363](https://github.com/getsentry/relay/pull/5363)) + ## 25.11.1 **Breaking Changes**: diff --git a/Cargo.lock b/Cargo.lock index 509c9c20cf3..ac1e37731b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5088,9 +5088,9 @@ dependencies = [ [[package]] name = "sentry_protos" -version = "0.4.2" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef5b7751756e49938d79bdb5e0e881f71ffca7bf207aa7866f837d8a965753d" +checksum = "cf33a607845b7433ca41476e79004c1e2eaebf682f4c1d8e2145a288914ce8d8" dependencies = [ "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 89932e7ece2..925b50cd672 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -184,7 +184,7 @@ sentry-release-parser = { version = "1.4.0", default-features = false, features "semver-1", ] } sentry-types = "0.41.0" -sentry_protos = "0.4.2" +sentry_protos = "0.4.8" serde = { version = "=1.0.228", features = ["derive", "rc"] } serde-transcode = "1.1.1" serde-vars = "0.2.0" diff --git a/relay-event-schema/src/protocol/attachment_v2.rs b/relay-event-schema/src/protocol/attachment_v2.rs index 04a9448bf2e..d0d3fac29cb 100644 --- a/relay-event-schema/src/protocol/attachment_v2.rs +++ b/relay-event-schema/src/protocol/attachment_v2.rs @@ -1,13 +1,17 @@ use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, Value}; use crate::processor::ProcessValue; -use crate::protocol::{Attributes, Timestamp}; +use crate::protocol::{Attributes, Timestamp, TraceId}; use uuid::Uuid; /// Metadata for a span attachment. #[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] pub struct AttachmentV2Meta { + /// The ID of the trace that the attachment belongs to. + #[metastructure(required = true, nonempty = true, trim = false)] + pub trace_id: Annotated, + /// Unique identifier for this attachment. #[metastructure(required = true, nonempty = true, trim = false)] pub attachment_id: Annotated, diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index a9a1bf0708b..19f742a087c 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -1067,6 +1067,15 @@ pub enum ParentId { SpanId(Option), } +impl ParentId { + /// Converts the ID to a span ID (if applicable). + pub fn as_span_id(&self) -> Option { + match self { + ParentId::SpanId(span_id) => *span_id, + } + } +} + #[cfg(test)] mod tests { use crate::integrations::OtelFormat; diff --git a/relay-server/src/managed/counted.rs b/relay-server/src/managed/counted.rs index 6f8fdd45faa..b4eeca98b87 100644 --- a/relay-server/src/managed/counted.rs +++ b/relay-server/src/managed/counted.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use itertools::Either; use relay_event_schema::protocol::{ OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2, TraceMetric, }; @@ -40,6 +41,19 @@ impl Counted for Option { } } +impl Counted for Either +where + L: Counted, + R: Counted, +{ + fn quantities(&self) -> Quantities { + match self { + Either::Left(value) => value.quantities(), + Either::Right(value) => value.quantities(), + } + } +} + impl Counted for Item { fn quantities(&self) -> Quantities { self.quantities() diff --git a/relay-server/src/managed/managed.rs b/relay-server/src/managed/managed.rs index e1de5bcaf7a..d177c753f47 100644 --- a/relay-server/src/managed/managed.rs +++ b/relay-server/src/managed/managed.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use chrono::{DateTime, Utc}; +use itertools::Either; use relay_event_schema::protocol::EventId; use relay_quotas::{DataCategory, Scoping}; use relay_system::Addr; @@ -593,6 +594,17 @@ impl Managed> { } } +impl Managed> { + /// Turns a managed option into an optional [`Managed`]. + pub fn transpose(self) -> Either, Managed> { + let (either, meta) = self.destructure(); + match either { + Either::Left(value) => Either::Left(Managed::from_parts(value, meta)), + Either::Right(value) => Either::Right(Managed::from_parts(value, meta)), + } + } +} + impl From>> for ManagedEnvelope { fn from(value: Managed>) -> Self { let (value, meta) = value.destructure(); diff --git a/relay-server/src/processing/forward.rs b/relay-server/src/processing/forward.rs index 74fb6ebca0e..790fefb1c1c 100644 --- a/relay-server/src/processing/forward.rs +++ b/relay-server/src/processing/forward.rs @@ -36,7 +36,6 @@ impl<'a> StoreHandle<'a> { } /// Sends a message to the [`Upload`] service. - #[expect(unused)] pub fn upload(&self, message: M) where Upload: FromMessage, diff --git a/relay-server/src/processing/logs/store.rs b/relay-server/src/processing/logs/store.rs index edd75413055..d6f68ebf7aa 100644 --- a/relay-server/src/processing/logs/store.rs +++ b/relay-server/src/processing/logs/store.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; -use prost_types::Timestamp; use relay_event_schema::protocol::{Attributes, OurLog, OurLogLevel, SpanId}; use relay_protocol::{Annotated, IntoValue, Value}; use relay_quotas::Scoping; @@ -10,7 +9,7 @@ use uuid::Uuid; use crate::envelope::WithHeader; use crate::processing::logs::{Error, Result}; -use crate::processing::utils::store::{AttributeMeta, extract_meta_attributes}; +use crate::processing::utils::store::{AttributeMeta, extract_meta_attributes, proto_timestamp}; use crate::processing::{Counted, Retention}; use crate::services::outcome::DiscardReason; use crate::services::store::StoreTraceItem; @@ -66,10 +65,10 @@ pub fn convert(log: WithHeader, ctx: &Context) -> Result item_type: TraceItemType::Log.into(), organization_id: ctx.scoping.organization_id.value(), project_id: ctx.scoping.project_id.value(), - received: Some(ts(ctx.received_at)), + received: Some(proto_timestamp(ctx.received_at)), retention_days: ctx.retention.standard.into(), downsampled_retention_days: ctx.retention.downsampled.into(), - timestamp: Some(ts(timestamp.0)), + timestamp: Some(proto_timestamp(timestamp.0)), trace_id: required!(log.trace_id).to_string(), item_id: Uuid::new_v7(timestamp.into()).as_bytes().to_vec(), attributes: attributes(meta, attrs, fields), @@ -83,13 +82,6 @@ pub fn convert(log: WithHeader, ctx: &Context) -> Result }) } -fn ts(dt: DateTime) -> Timestamp { - Timestamp { - seconds: dt.timestamp(), - nanos: i32::try_from(dt.timestamp_subsec_nanos()).unwrap_or(0), - } -} - /// Fields on the log message which are stored as fields. struct FieldAttributes { /// The log level. diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index d4cca99fe8f..9480ddd36c0 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use either::Either; use relay_event_normalization::GeoIpLookup; use relay_event_schema::processor::ProcessingAction; -use relay_event_schema::protocol::{AttachmentV2Meta, SpanId, SpanV2}; +use relay_event_schema::protocol::{AttachmentV2Meta, SpanV2}; use relay_pii::PiiConfigError; use relay_protocol::Annotated; use relay_quotas::{DataCategory, RateLimits}; @@ -242,21 +242,24 @@ impl Forward for SpanOutput { retention: ctx.retention(|r| r.span.as_ref()), }; - // Explicitly drop standalone attachments before splitting - // They are not stored for now. - // This must be fixed before enabling the feature flag. - let spans = spans.map(|mut inner, record_keeper| { - if !inner.stand_alone_attachments.is_empty() { - let standalone = std::mem::take(&mut inner.stand_alone_attachments); - record_keeper.reject_err(Outcome::Invalid(DiscardReason::Internal), standalone); + let spans_and_attachments = spans.split(|spans| spans.into_parts()); + for either in spans_and_attachments { + match either.transpose() { + Either::Left(span) => { + if let Ok(span) = span.try_map(|span, _| store::convert(span, &ctx)) { + s.store(span); + } + } + Either::Right(attachment) => { + if let Ok(attachment) = store::attachment::convert( + attachment, + ctx.retention, + ctx.server_sample_rate, + ) { + s.upload(attachment); + } + } } - inner - }); - - for span in spans.split(|spans| spans.into_indexed_spans()) { - if let Ok(span) = span.try_map(|span, _| store::convert(span, &ctx)) { - s.store(span) - }; } Ok(()) @@ -359,8 +362,7 @@ impl ExpandedSpans { for ExpandedSpan { span, attachments } in self.spans { for attachment in attachments { - let span_id = span.value().and_then(|s| s.span_id.value().copied()); - items.push(attachment_to_item(attachment, span_id)?); + items.push(attachment_to_item(attachment)?); } spans_without_attachments.push(span); @@ -373,7 +375,7 @@ impl ExpandedSpans { } for attachment in self.stand_alone_attachments { - items.push(attachment_to_item(attachment, None)?); + items.push(attachment_to_item(attachment)?); } Ok(Envelope::from_parts(self.headers, Items::from_vec(items))) @@ -403,11 +405,12 @@ impl ExpandedSpans { } } -fn attachment_to_item( - attachment: ExpandedAttachment, - span_id: Option, -) -> Result { - let ExpandedAttachment { meta, body } = attachment; +fn attachment_to_item(attachment: ExpandedAttachment) -> Result { + let ExpandedAttachment { + parent_id, + meta, + body, + } = attachment; let meta_json = meta.to_json()?; let meta_bytes = meta_json.into_bytes(); @@ -420,7 +423,7 @@ fn attachment_to_item( let mut item = Item::new(ItemType::Attachment); item.set_payload(ContentType::AttachmentV2, payload.freeze()); item.set_meta_length(meta_length as u32); - item.set_parent_id(ParentId::SpanId(span_id)); + item.set_parent_id(parent_id); Ok(item) } @@ -450,8 +453,22 @@ impl ExpandedSpans { impl ExpandedSpans { #[cfg(feature = "processing")] - fn into_indexed_spans(self) -> impl Iterator { - self.spans.into_iter().map(IndexedSpan) + fn into_parts(self) -> impl Iterator> { + let Self { + headers: _, + server_sample_rate: _, + spans, + stand_alone_attachments, + category: _, + } = self; + spans + .into_iter() + .flat_map(|span| { + let ExpandedSpan { span, attachments } = span; + std::iter::once(Either::Left(IndexedSpanOnly(span))) + .chain(attachments.into_iter().map(Either::Right)) + }) + .chain(stand_alone_attachments.into_iter().map(Either::Right)) } } @@ -610,23 +627,23 @@ impl Managed> { } } -/// A Span which only represents the indexed category. #[cfg(feature = "processing")] #[derive(Debug)] -struct IndexedSpan(ExpandedSpan); +struct IndexedSpanOnly(WithHeader); #[cfg(feature = "processing")] -impl Counted for IndexedSpan { +impl Counted for IndexedSpanOnly { fn quantities(&self) -> Quantities { - let mut quantities = smallvec::smallvec![(DataCategory::SpanIndexed, 1)]; - quantities.extend(self.0.attachments.quantities()); - quantities + smallvec::smallvec![(DataCategory::SpanIndexed, 1)] } } /// A validated and parsed span attachment. #[derive(Debug)] pub struct ExpandedAttachment { + /// The ID of the log / span / metric that owns the span. + pub parent_id: ParentId, + /// The parsed metadata from the attachment. pub meta: Annotated, diff --git a/relay-server/src/processing/spans/process.rs b/relay-server/src/processing/spans/process.rs index 66b6f0eab7a..ef7c2e46f25 100644 --- a/relay-server/src/processing/spans/process.rs +++ b/relay-server/src/processing/spans/process.rs @@ -144,6 +144,7 @@ fn parse_and_validate_span_attachment(item: &Item) -> Result<(Option, Ex Ok(( associated_span_id, ExpandedAttachment { + parent_id: ParentId::SpanId(associated_span_id), meta, body: Bytes::copy_from_slice(body), }, diff --git a/relay-server/src/processing/spans/store.rs b/relay-server/src/processing/spans/store.rs index 1e70b59ef7a..a1647893ef2 100644 --- a/relay-server/src/processing/spans/store.rs +++ b/relay-server/src/processing/spans/store.rs @@ -1,13 +1,16 @@ use std::ops::Deref; -use relay_event_schema::protocol::SpanV2; -use relay_protocol::{Annotated, FiniteF64}; +use relay_event_schema::protocol::Attributes; +use relay_protocol::Annotated; +use relay_protocol::FiniteF64; use crate::processing::Retention; -use crate::processing::spans::{Error, IndexedSpan, Result}; +use crate::processing::spans::{Error, IndexedSpanOnly, Result}; use crate::services::outcome::DiscardReason; use crate::services::store::StoreSpanV2; +pub mod attachment; + macro_rules! required { ($value:expr) => {{ match $value { @@ -31,14 +34,13 @@ pub struct Context { pub retention: Retention, } -/// Converts a processed [`SpanV2`] into a [Kafka](crate::services::store::Store) compatible format. -pub fn convert(span: IndexedSpan, ctx: &Context) -> Result> { - // TODO: We are not doing anything with the attachment here. - let mut span = required!(span.0.span.value); +/// Converts a processed [`SpanV2`](super::SpanV2) into a [Kafka](crate::services::store::Store) compatible format. +pub fn convert(span: IndexedSpanOnly, ctx: &Context) -> Result> { + let mut span = required!(span.0.value); let routing_key = span.trace_id.value().map(|v| *v.deref()); - inject_server_sample_rate(&mut span, ctx.server_sample_rate); + inject_server_sample_rate(&mut span.attributes, ctx.server_sample_rate); Ok(Box::new(StoreSpanV2 { routing_key, @@ -49,11 +51,14 @@ pub fn convert(span: IndexedSpan, ctx: &Context) -> Result> { } /// Injects a server sample rate into a span. -fn inject_server_sample_rate(span: &mut SpanV2, server_sample_rate: Option) { +fn inject_server_sample_rate( + attributes: &mut Annotated, + server_sample_rate: Option, +) { let Some(server_sample_rate) = server_sample_rate.and_then(FiniteF64::new) else { return; }; - let attributes = span.attributes.get_or_insert_with(Default::default); + let attributes = attributes.get_or_insert_with(Default::default); attributes.insert("sentry.server_sample_rate", server_sample_rate.to_f64()); } diff --git a/relay-server/src/processing/trace_metrics/store.rs b/relay-server/src/processing/trace_metrics/store.rs index 93d69b8e05c..1f6631d3002 100644 --- a/relay-server/src/processing/trace_metrics/store.rs +++ b/relay-server/src/processing/trace_metrics/store.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; use prost_types::Timestamp; use relay_base_schema::metrics::MetricUnit; -use relay_conventions::CLIENT_SAMPLE_RATE; use relay_event_schema::protocol::{Attributes, MetricType, SpanId, TraceMetric}; use relay_protocol::{Annotated, IntoValue, Value}; use relay_quotas::Scoping; @@ -12,7 +11,9 @@ use uuid::Uuid; use crate::envelope::WithHeader; use crate::processing::trace_metrics::{Error, Result}; -use crate::processing::utils::store::{AttributeMeta, extract_meta_attributes}; +use crate::processing::utils::store::{ + AttributeMeta, extract_client_sample_rate, extract_meta_attributes, +}; use crate::processing::{Counted, Retention}; use crate::services::outcome::DiscardReason; use crate::services::store::StoreTraceItem; @@ -107,14 +108,6 @@ fn extract_numeric_value(value: Value) -> Result { } } -fn extract_client_sample_rate(attributes: &Attributes) -> Option { - attributes - .get_value(CLIENT_SAMPLE_RATE) - .and_then(|value| value.as_f64()) - .filter(|v| *v > 0.0) - .filter(|v| *v <= 1.0) -} - fn attributes( meta: HashMap, attributes: Attributes, @@ -249,6 +242,9 @@ mod tests { use relay_event_schema::protocol::{Attribute, AttributeType, AttributeValue}; use relay_protocol::FromValue; use relay_protocol::Object; + use relay_quotas::Scoping; + + use crate::processing::Retention; use super::*; diff --git a/relay-server/src/processing/utils/store.rs b/relay-server/src/processing/utils/store.rs index 21b90566819..15da16ad26f 100644 --- a/relay-server/src/processing/utils/store.rs +++ b/relay-server/src/processing/utils/store.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use chrono::Utc; +use relay_conventions::CLIENT_SAMPLE_RATE; use relay_event_schema::protocol::Attributes; use relay_protocol::{Annotated, IntoValue, MetaTree}; @@ -120,3 +122,20 @@ fn size_of_meta_tree(meta: &MetaTree) -> usize { size } + +/// Converts a [`chrono::DateTime`] into a [`prost_types::Timestamp`] +pub fn proto_timestamp(dt: chrono::DateTime) -> prost_types::Timestamp { + prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: i32::try_from(dt.timestamp_subsec_nanos()).unwrap_or(0), + } +} + +/// Extracts the client sample rate from trace attributes. +pub fn extract_client_sample_rate(attributes: &Attributes) -> Option { + attributes + .get_value(CLIENT_SAMPLE_RATE) + .and_then(|value| value.as_f64()) + .filter(|v| *v > 0.0) + .filter(|v| *v <= 1.0) +} diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 87784745a52..1d29b75da4e 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -2,19 +2,26 @@ use std::sync::Arc; use std::time::Duration; +use bytes::Bytes; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use objectstore_client::{Client, ExpirationPolicy, Session, Usecase}; use relay_config::UploadServiceConfig; +use relay_quotas::DataCategory; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; +use sentry_protos::snuba::v1::TraceItem; +use smallvec::smallvec; +use uuid::Uuid; use crate::constants::DEFAULT_ATTACHMENT_RETENTION; use crate::envelope::{Item, ItemType}; -use crate::managed::{OutcomeError, TypedEnvelope}; +use crate::managed::{ + Counted, Managed, ManagedResult, OutcomeError, Quantities, Rejected, TypedEnvelope, +}; use crate::services::outcome::DiscardReason; use crate::services::processor::Processed; -use crate::services::store::{Store, StoreEnvelope}; +use crate::services::store::{Store, StoreEnvelope, StoreTraceItem}; use crate::statsd::{RelayCounters, RelayGauges}; use super::outcome::Outcome; @@ -22,6 +29,16 @@ use super::outcome::Outcome; /// Messages that the upload service can handle. pub enum Upload { Envelope(StoreEnvelope), + Attachment(Managed), +} + +impl Upload { + fn ty(&self) -> &str { + match self { + Upload::Envelope(_) => "envelope", + Upload::Attachment(_) => "attachment_v2", + } + } } impl Interface for Upload {} @@ -34,18 +51,51 @@ impl FromMessage for Upload { } } +impl FromMessage> for Upload { + type Response = NoResponse; + + fn from_message(message: Managed, _sender: ()) -> Self { + Self::Attachment(message) + } +} + +/// An attachment that is ready for upload / EAP storage. +pub struct StoreAttachment { + /// The body to be uploaded to objectstore. + pub body: Bytes, + /// The trace item to be published via Kafka. + pub trace_item: TraceItem, +} + +impl Counted for StoreAttachment { + fn quantities(&self) -> Quantities { + smallvec![ + (DataCategory::AttachmentItem, 1), + (DataCategory::Attachment, self.body.len()), + ] + } +} + /// Errors that can occur when trying to upload an attachment. -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum Error { + #[error("timeout")] Timeout, - UploadFailed, + #[error("load shed")] + LoadShed, + #[error("upload failed: {0}")] + UploadFailed(#[from] objectstore_client::Error), + #[error("UUID conversion failed: {0}")] + Uuid(#[from] uuid::Error), } impl Error { fn as_str(&self) -> &'static str { match self { Error::Timeout => "timeout", - Error::UploadFailed => "upload_failed", + Error::LoadShed => "load-shed", + Error::UploadFailed(_) => "upload_failed", + Error::Uuid(_) => "uuid", } } } @@ -83,7 +133,9 @@ impl UploadService { }; let objectstore_client = Client::builder(objectstore_url).build()?; - let attachments_usecase = Usecase::new("attachments") + let event_attachments = Usecase::new("attachments") + .with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION)); + let trace_attachments = Usecase::new("trace_attachments") .with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION)); let inner = UploadServiceInner { @@ -92,7 +144,8 @@ impl UploadService { store, objectstore_client, - attachments_usecase, + event_attachments, + trace_attachments, }; Ok(Some(Self { @@ -103,30 +156,28 @@ impl UploadService { } fn handle_message(&self, message: Upload) { - let Upload::Envelope(StoreEnvelope { envelope }) = message; if self.pending_requests.len() >= self.max_concurrent_requests { // Load shed to prevent backlogging in the service queue and affecting other parts of Relay. // We might want to have a less aggressive mechanism in the future. - - let attachment_count = envelope - .envelope() - .items() - .filter(|item| *item.ty() == ItemType::Attachment) - .count(); relay_statsd::metric!( - counter(RelayCounters::AttachmentUpload) += attachment_count as u64, + counter(RelayCounters::AttachmentUpload) += 1, result = "load_shed", - type = "envelope", + type = message.ty(), ); - - // for now, this will just forward to the store endpoint without uploading attachments. - self.inner.store.send(StoreEnvelope { envelope }); + match message { + Upload::Envelope(envelope) => { + // Event attachments can still go the old route. + self.inner.store.send(envelope); + } + Upload::Attachment(managed) => { + let _ = managed.reject_err(Error::LoadShed); + } + } return; } - let inner = self.inner.clone(); - let future = async move { inner.handle_envelope(envelope).await }; - self.pending_requests.push(future.boxed()); + self.pending_requests + .push(async move { inner.handle_message(message).await }.boxed()); } } @@ -160,10 +211,20 @@ struct UploadServiceInner { store: Addr, objectstore_client: Client, - attachments_usecase: Usecase, + event_attachments: Usecase, + trace_attachments: Usecase, } impl UploadServiceInner { + async fn handle_message(&self, message: Upload) { + match message { + Upload::Envelope(StoreEnvelope { envelope }) => { + self.handle_envelope(envelope).await; + } + Upload::Attachment(attachment) => self.handle_attachment(attachment).await, + } + } + /// Uploads all attachments belonging to the given envelope. /// /// This mutates the attachment items in-place, setting their `stored_key` field to the key @@ -171,7 +232,7 @@ impl UploadServiceInner { async fn handle_envelope(&self, mut envelope: TypedEnvelope) -> () { let scoping = envelope.scoping(); let session = self - .attachments_usecase + .event_attachments .for_project(scoping.organization_id.value(), scoping.project_id.value()) .session(&self.objectstore_client); @@ -190,7 +251,7 @@ impl UploadServiceInner { } Ok(session) => { for attachment in attachments { - let result = self.handle_attachment(&session, attachment).await; + let result = self.handle_envelope_attachment(&session, attachment).await; relay_statsd::metric!( counter(RelayCounters::AttachmentUpload) += 1, result = match result { @@ -207,7 +268,85 @@ impl UploadServiceInner { self.store.send(StoreEnvelope { envelope }); } - async fn handle_attachment( + async fn handle_attachment(&self, managed: Managed) { + let result = self.do_handle_store_attachment(managed).await; + + relay_statsd::metric!( + counter(RelayCounters::AttachmentUpload) += 1, + result = match result { + Ok(()) => "success", + Err(e) => e.into_inner().as_str(), + }, + type = "attachment_v2", + ); + } + + async fn do_handle_store_attachment( + &self, + managed: Managed, + ) -> Result<(), Rejected> { + let scoping = managed.scoping(); + let session = self + .trace_attachments + .for_project(scoping.organization_id.value(), scoping.project_id.value()) + .session(&self.objectstore_client) + .map_err(Error::UploadFailed) + .reject(&managed)?; + + let quantities = managed.quantities(); + let body = Bytes::clone(&managed.body); + + // Make sure that the attachment can be converted into a trace item: + let trace_item = managed.try_map(|attachment, _record_keeper| { + let StoreAttachment { + trace_item, + body: _, + } = attachment; + Ok::<_, Error>(StoreTraceItem { + trace_item, + quantities, + }) + })?; + + // Upload the attachment: + if !body.is_empty() { + relay_log::trace!("Starting attachment upload"); + let key = Uuid::from_slice(&trace_item.trace_item.item_id) + .map_err(Error::from) + .reject(&trace_item)? + .to_string(); + + #[cfg(debug_assertions)] + let original_key = key.clone(); + + let future = async { + let result = session + .put(body.clone()) + .key(key) + .send() + .await + .map_err(Error::UploadFailed) + .reject(&trace_item)?; + Ok(result.key) + }; + + let stored_key = tokio::time::timeout(self.timeout, future) + .await + .map_err(|_elapsed| Error::Timeout) + .reject(&trace_item)??; + + #[cfg(debug_assertions)] + debug_assert_eq!(stored_key, original_key); + relay_log::trace!("Finished attachment upload"); + } + + // Only after successful upload forward the attachment to the store. + self.store.send(trace_item); + + Ok(()) + } + + async fn handle_envelope_attachment( &self, session: &Session, attachment: &mut Item, @@ -225,7 +364,7 @@ impl UploadServiceInner { let stored_key = tokio::time::timeout(self.timeout, future) .await .map_err(|_elapsed| Error::Timeout)? - .map_err(|_error: objectstore_client::Error| Error::UploadFailed)?; + .map_err(Error::UploadFailed)?; attachment.set_stored_key(stored_key); relay_log::trace!("Finished attachment upload"); diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index ca639def9b0..40ba8b36e21 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -44,6 +44,7 @@ items_consumer, profiles_consumer, feedback_consumer, + objectstore, ) diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 52104b81c40..c3501577449 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -4,6 +4,7 @@ import msgpack import uuid +from objectstore_client import Client, Usecase import pytest import os import confluent_kafka as kafka @@ -128,6 +129,16 @@ def relay_with_playstation(mini_sentry, relay): return relay +@pytest.fixture +def objectstore(): + def inner(usecase: str, project_id: int): + return Client("http://127.0.0.1:8888/").session( + Usecase(usecase), org=1, project=project_id + ) + + return inner + + def kafka_producer(options): # look for the servers (it is the only config we are interested in) servers = [ @@ -302,7 +313,7 @@ def assert_rate_limited( key_id=None, categories=None, quantity=None, - timeout=1, + timeout=None, ignore_other=False, ): expected_categories = ( diff --git a/tests/integration/test_attachments.py b/tests/integration/test_attachments.py index be5a632a8fd..5d26211db43 100644 --- a/tests/integration/test_attachments.py +++ b/tests/integration/test_attachments.py @@ -4,7 +4,6 @@ from requests.exceptions import HTTPError from sentry_sdk.envelope import Envelope, Item, PayloadRef -from objectstore_client import Client, Usecase from .test_store import make_transaction @@ -118,7 +117,11 @@ def test_mixed_attachments_with_processing( def test_attachments_with_objectstore( - mini_sentry, relay_with_processing, attachments_consumer, outcomes_consumer + mini_sentry, + relay_with_processing, + attachments_consumer, + outcomes_consumer, + objectstore, ): project_id = 42 event_id = "515539018c9b4260a6f999572f1661ee" @@ -148,10 +151,8 @@ def test_attachments_with_objectstore( attachment = attachments_consumer.get_individual_attachment() objectstore_key = attachment["attachment"].pop("stored_id") - objectstore_session = Client("http://127.0.0.1:8888/").session( - Usecase("attachments"), org=1, project=project_id - ) - assert objectstore_session.get(objectstore_key).payload.read() == chunked_contents + objectstore = objectstore("attachments", project_id) + assert objectstore.get(objectstore_key).payload.read() == chunked_contents assert attachment["attachment"].pop("id") assert attachment == { diff --git a/tests/integration/test_attachmentsv2.py b/tests/integration/test_attachmentsv2.py index be406b06bf8..a924f7c1ee3 100644 --- a/tests/integration/test_attachmentsv2.py +++ b/tests/integration/test_attachmentsv2.py @@ -1,3 +1,4 @@ +import base64 from datetime import datetime, timezone from sentry_sdk.envelope import Envelope, Item, PayloadRef from sentry_relay.consts import DataCategory @@ -21,6 +22,7 @@ def create_attachment_metadata(): return { + "trace_id": uuid.uuid4().hex, "attachment_id": str(uuid.uuid4()), "timestamp": 1760520026.781239, "filename": "myfile.txt", @@ -89,13 +91,78 @@ def test_standalone_attachment_forwarding(mini_sentry, relay): assert attachment_item.headers == headers +def test_standalone_attachment_store( + mini_sentry, relay_with_processing, items_consumer, objectstore +): + items_consumer = items_consumer() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + relay = relay_with_processing( + {"processing": {"upload": {"objectstore_url": "http://127.0.0.1:8888/"}}} + ) + + attachment_metadata = create_attachment_metadata() + attachment_body = b"This is some mock attachment content" + metadata_bytes = json.dumps(attachment_metadata, separators=(",", ":")).encode( + "utf-8" + ) + combined_payload = metadata_bytes + attachment_body + + envelope = create_attachment_envelope(project_config) + headers = { + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(metadata_bytes), + "span_id": None, + "length": len(combined_payload), + "type": "attachment", + } + + attachment_item = Item(payload=PayloadRef(bytes=combined_payload), headers=headers) + envelope.add_item(attachment_item) + relay.send_envelope(project_id, envelope) + + produced_item = items_consumer.get_item() + expected_item_id = base64.b64encode( + uuid.UUID(hex=attachment_metadata["attachment_id"]).bytes + ).decode("utf-8") + assert produced_item == { + "attributes": { + "file.name": {"stringValue": "myfile.txt"}, + "foo": {"stringValue": "bar"}, + "sentry.content-type": {"stringValue": "text/plain"}, + }, + "clientSampleRate": 1.0, + "downsampledRetentionDays": 90, + "itemId": expected_item_id, + "itemType": 10, + "organizationId": "1", + "projectId": "42", + "received": mock.ANY, + "retentionDays": 90, + "serverSampleRate": 1.0, + "timestamp": mock.ANY, + "traceId": attachment_metadata["trace_id"].replace("-", ""), + } + + objectstore = objectstore(usecase="trace_attachments", project_id=project_id) + assert ( + objectstore.get(attachment_metadata["attachment_id"]).payload.read() + == attachment_body + ) + + @pytest.mark.parametrize( "invalid_headers,quantity", [ # Invalid since there is no span with that id in the envelope, also the quantity here is # lower since only the body is already counted at this point and not the meta. pytest.param({"span_id": "ABCDFDEAD5F74052"}, 36, id="invalid_span_id"), - pytest.param({"meta_length": None}, 227, id="missing_meta_length"), + pytest.param({"meta_length": None}, 273, id="missing_meta_length"), pytest.param({"meta_length": 999}, 1, id="meta_length_exceeds_payload"), ], ) @@ -225,13 +292,95 @@ def test_attachment_with_matching_span(mini_sentry, relay): assert attachment.payload.bytes == combined_payload assert attachment.headers == { "type": "attachment", - "length": 214, + "length": 260, "content_type": "application/vnd.sentry.attachment.v2", - "meta_length": 191, + "meta_length": 237, "span_id": span_id, } +def test_attachment_with_matching_span_store( + mini_sentry, relay_with_processing, items_consumer, objectstore +): + items_consumer = items_consumer() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:span-v2-experimental-processing", + "projects:span-v2-attachment-processing", + ] + relay = relay_with_processing( + {"processing": {"upload": {"objectstore_url": "http://127.0.0.1:8888/"}}} + ) + + ts = datetime.now(timezone.utc) + span_id = "eee19b7ec3c1b174" + trace_id = "5b8efff798038103d269b633813fc60c" + envelope = envelope_with_spans( + { + "start_timestamp": ts.timestamp(), + "end_timestamp": ts.timestamp() + 0.5, + "trace_id": trace_id, + "span_id": span_id, + "is_segment": True, + "name": "test span", + "status": "ok", + }, + trace_info={ + "trace_id": trace_id, + "public_key": project_config["publicKeys"][0]["publicKey"], + }, + ) + + metadata = create_attachment_metadata() + body = b"span attachment content" + metadata_bytes = json.dumps(metadata, separators=(",", ":")).encode("utf-8") + combined_payload = metadata_bytes + body + + envelope.add_item( + Item( + payload=PayloadRef(bytes=combined_payload), + type="attachment", + headers={ + "content_type": "application/vnd.sentry.attachment.v2", + "meta_length": len(metadata_bytes), + "span_id": span_id, + "length": len(combined_payload), + }, + ) + ) + + relay.send_envelope(project_id, envelope) + + attachment_item = items_consumer.get_item() + expected_item_id = base64.b64encode( + uuid.UUID(hex=metadata["attachment_id"]).bytes + ).decode("utf-8") + assert attachment_item == { + "attributes": { + "file.name": {"stringValue": "myfile.txt"}, + "foo": {"stringValue": "bar"}, + "sentry.content-type": {"stringValue": "text/plain"}, + "sentry.span_id": {"stringValue": span_id}, + }, + "clientSampleRate": 1.0, + "downsampledRetentionDays": 90, + "itemId": expected_item_id, + "itemType": 10, + "organizationId": "1", + "projectId": "42", + "received": mock.ANY, + "retentionDays": 90, + "serverSampleRate": 1.0, + "timestamp": mock.ANY, + "traceId": metadata["trace_id"], + } + + objectstore = objectstore(usecase="trace_attachments", project_id=project_id) + assert objectstore.get(metadata["attachment_id"]).payload.read() == body + + def test_two_attachments_mapping_to_same_span(mini_sentry, relay): project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) @@ -316,9 +465,9 @@ def test_two_attachments_mapping_to_same_span(mini_sentry, relay): assert item.payload.bytes == combined_payload assert item.headers == { "type": "attachment", - "length": 214, + "length": 260, "content_type": "application/vnd.sentry.attachment.v2", - "meta_length": 191, + "meta_length": 237, "span_id": span_id, } diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index 30a8f332740..eb052e576c5 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -3,7 +3,6 @@ import msgpack import pytest -from objectstore_client import Client, Usecase from requests import HTTPError from uuid import UUID @@ -381,6 +380,7 @@ def test_minidump_with_processing( rate_limit, minidump_filename, use_objectstore, + objectstore, ): dmp_path = os.path.join(os.path.dirname(__file__), "fixtures/native/minidump.dmp") with open(dmp_path, "rb") as f: @@ -481,10 +481,8 @@ def test_minidump_with_processing( (attachment,) = message["attachments"] objectstore_key = attachment.pop("stored_id") - objectstore_session = Client("http://127.0.0.1:8888/").session( - Usecase("attachments"), org=1, project=project_id - ) - assert objectstore_session.get(objectstore_key).payload.read() == content + objectstore = objectstore("attachments", project_id) + assert objectstore.get(objectstore_key).payload.read() == content assert attachment.pop("id") assert attachment == { From d5cf19a1a8c1f5c99b947ec3aca26c0dfa240421 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 3 Dec 2025 12:46:16 +0100 Subject: [PATCH 2/5] add file --- .../src/processing/spans/store/attachment.rs | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 relay-server/src/processing/spans/store/attachment.rs diff --git a/relay-server/src/processing/spans/store/attachment.rs b/relay-server/src/processing/spans/store/attachment.rs new file mode 100644 index 00000000000..74593e95d2c --- /dev/null +++ b/relay-server/src/processing/spans/store/attachment.rs @@ -0,0 +1,184 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use relay_event_schema::protocol::{AttachmentV2Meta, Attributes, SpanId}; +use relay_protocol::{Annotated, IntoValue, Value}; +use relay_quotas::Scoping; +use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value}; + +use crate::managed::{Managed, Rejected}; +use crate::processing::Retention; +use crate::processing::spans::{ExpandedAttachment, Result}; +use crate::processing::utils::store::{ + AttributeMeta, extract_client_sample_rate, extract_meta_attributes, proto_timestamp, +}; +use crate::services::outcome::{DiscardReason, Outcome}; +use crate::services::upload::StoreAttachment; + +/// Converts an expanded attachment to a storable unit. +pub fn convert( + attachment: Managed, + retention: Retention, + server_sample_rate: Option, +) -> Result, Rejected<()>> { + let scoping = attachment.scoping(); + let received_at = attachment.received_at(); + attachment.try_map(|attachment, _record_keeper| { + let ExpandedAttachment { + parent_id, + meta, + body, + } = attachment; + let ctx = Context { + span_id: parent_id.as_span_id(), + received_at, + scoping, + retention, + server_sample_rate, + }; + let trace_item = attachment_to_trace_item(meta, ctx) + .ok_or(Outcome::Invalid(DiscardReason::InvalidSpanAttachment))?; + + Ok::<_, Outcome>(StoreAttachment { trace_item, body }) + }) +} + +/// Context for converting an attachment to a trace item. +#[derive(Debug, Clone, Copy)] +struct Context { + /// Received time. + received_at: DateTime, + /// Item scoping. + scoping: Scoping, + /// Item retention. + retention: Retention, + /// Server-side sample rate. + server_sample_rate: Option, + /// The ID of the span that owns the attachment. + span_id: Option, +} + +fn attachment_to_trace_item(meta: Annotated, ctx: Context) -> Option { + let meta = meta.into_value()?; + let annotated_meta = extract_meta_attributes(&meta, &meta.attributes); + let AttachmentV2Meta { + trace_id, + attachment_id, + timestamp, + filename, + content_type, + attributes, + other: _, + } = meta; + + let fields = Fields { + content_type: content_type.into_value()?, + filename: filename.into_value(), + span_id: ctx.span_id, + }; + + let attributes = attributes.into_value().unwrap_or_default(); + + let client_sample_rate = extract_client_sample_rate(&attributes).unwrap_or(1.0); + + let trace_item = TraceItem { + organization_id: ctx.scoping.organization_id.value(), + project_id: ctx.scoping.project_id.value(), + trace_id: trace_id.into_value()?.to_string(), + item_id: attachment_id.into_value()?.into_bytes().to_vec(), + item_type: TraceItemType::Attachment.into(), + timestamp: Some(proto_timestamp(timestamp.into_value()?.0)), + attributes: convert_attributes(annotated_meta, attributes, fields), + client_sample_rate, + server_sample_rate: ctx.server_sample_rate.unwrap_or(1.0), + retention_days: ctx.retention.standard as u32, + received: Some(proto_timestamp(ctx.received_at)), + downsampled_retention_days: ctx.retention.downsampled as u32, + }; + Some(trace_item) +} + +struct Fields { + content_type: String, + filename: Option, + span_id: Option, +} + +// TODO: remove code-duplication between logs, trace metrics and attachments. +fn convert_attributes( + meta: HashMap, + attributes: Attributes, + fields: Fields, +) -> HashMap { + let mut result = meta; + result.reserve(attributes.0.len() + 5); + + for (name, attribute) in attributes { + let meta = AttributeMeta { + meta: IntoValue::extract_meta_tree(&attribute), + }; + if let Some(meta) = meta.to_any_value() { + result.insert(format!("sentry._meta.fields.attributes.{name}"), meta); + } + + let value = attribute + .into_value() + .and_then(|v| v.value.value.into_value()); + + let Some(value) = value else { + continue; + }; + + let Some(value) = (match value { + Value::Bool(v) => Some(any_value::Value::BoolValue(v)), + Value::I64(v) => Some(any_value::Value::IntValue(v)), + Value::U64(v) => i64::try_from(v).ok().map(any_value::Value::IntValue), + Value::F64(v) => Some(any_value::Value::DoubleValue(v)), + Value::String(v) => Some(any_value::Value::StringValue(v)), + Value::Array(_) | Value::Object(_) => { + debug_assert!(false, "unsupported attachment attribute value"); + None + } + }) else { + continue; + }; + + result.insert(name, AnyValue { value: Some(value) }); + } + + let Fields { + content_type, + filename, + span_id, + } = fields; + + result.insert( + "sentry.content-type".to_owned(), + AnyValue { + value: Some(any_value::Value::StringValue( + content_type.as_str().to_owned(), + )), + }, + ); + + // See https://opentelemetry.io/docs/specs/semconv/registry/attributes/file/#file-name. + if let Some(filename) = filename { + result.insert( + "file.name".to_owned(), + AnyValue { + value: Some(any_value::Value::StringValue(filename)), + }, + ); + } + + if let Some(span_id) = span_id { + result.insert( + "sentry.span_id".to_owned(), + AnyValue { + value: Some(any_value::Value::StringValue(span_id.to_string())), + }, + ); + } + + result +} From c5db609be275de045433102122c8062a97e31408 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 4 Dec 2025 12:42:39 +0100 Subject: [PATCH 3/5] fix comment --- relay-server/src/managed/managed.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/managed/managed.rs b/relay-server/src/managed/managed.rs index d177c753f47..cdcec2305b2 100644 --- a/relay-server/src/managed/managed.rs +++ b/relay-server/src/managed/managed.rs @@ -595,7 +595,7 @@ impl Managed> { } impl Managed> { - /// Turns a managed option into an optional [`Managed`]. + /// Turns a managed [`Either`] into an [`Either`] of [`Managed`]. pub fn transpose(self) -> Either, Managed> { let (either, meta) = self.destructure(); match either { From 28070a75ad6f40aa1a8696d6b95270268182dd69 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 5 Dec 2025 07:46:41 +0100 Subject: [PATCH 4/5] fix: metric count --- relay-server/src/services/upload.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 1d29b75da4e..72b46b11bae 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -39,6 +39,17 @@ impl Upload { Upload::Attachment(_) => "attachment_v2", } } + + fn attachment_count(&self) -> usize { + match self { + Self::Envelope(StoreEnvelope { envelope }) => envelope + .envelope() + .items() + .filter(|item| *item.ty() == ItemType::Attachment) + .count(), + Self::Attachment(_) => 1, + } + } } impl Interface for Upload {} @@ -157,22 +168,23 @@ impl UploadService { fn handle_message(&self, message: Upload) { if self.pending_requests.len() >= self.max_concurrent_requests { - // Load shed to prevent backlogging in the service queue and affecting other parts of Relay. - // We might want to have a less aggressive mechanism in the future. relay_statsd::metric!( - counter(RelayCounters::AttachmentUpload) += 1, + counter(RelayCounters::AttachmentUpload) += message.attachment_count(), result = "load_shed", type = message.ty(), ); match message { Upload::Envelope(envelope) => { // Event attachments can still go the old route. + self.inner.store.send(envelope); } Upload::Attachment(managed) => { + // Load shed to prevent backlogging in the service queue and affecting other parts of Relay. + // TODO: After the experimental phase, implement backpressure instead. let _ = managed.reject_err(Error::LoadShed); } - } + }; return; } let inner = self.inner.clone(); From 0213d03cd131568dcb85fd1ed6bfc881b0877870 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 5 Dec 2025 08:03:06 +0100 Subject: [PATCH 5/5] fix --- relay-server/src/services/upload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 72b46b11bae..a69fcbb362a 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -169,7 +169,7 @@ impl UploadService { fn handle_message(&self, message: Upload) { if self.pending_requests.len() >= self.max_concurrent_requests { relay_statsd::metric!( - counter(RelayCounters::AttachmentUpload) += message.attachment_count(), + counter(RelayCounters::AttachmentUpload) += message.attachment_count() as u64, result = "load_shed", type = message.ty(), );