Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions relay-server/src/managed/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl<G> TypedEnvelope<G> {
self.0.accept()
}

/// Returns the raw [`ManagedEnvelope`].
pub fn into_inner(self) -> ManagedEnvelope {
self.0
}

/// Creates a new typed envelope.
///
/// Note: this method is private to make sure that only `TryFrom` implementation is used, which
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/check_ins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Forward for CheckInsOutput {
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();
let envelope = ManagedEnvelope::from(envelope);

s.send_to_store(crate::services::store::StoreEnvelope { envelope });

Expand Down
18 changes: 1 addition & 17 deletions relay-server/src/processing/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,23 +344,7 @@ impl Forward for ErrorOutput {
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();

let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &ItemType::Attachment);
let use_objectstore = || {
let options = &ctx.global_config.options;
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if has_attachments && use_objectstore() {
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
} else {
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
}

s.send_envelope(ManagedEnvelope::from(envelope));
Ok(())
}
}
47 changes: 44 additions & 3 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use relay_dynamic_config::{RetentionConfig, RetentionsConfig};
use relay_system::{Addr, FromMessage};

use crate::Envelope;
#[cfg(feature = "processing")]
use crate::managed::ManagedEnvelope;
use crate::managed::{Managed, Rejected};
#[cfg(feature = "processing")]
use crate::services::objectstore::Objectstore;
Expand All @@ -18,12 +20,21 @@ use crate::services::store::Store;
pub struct StoreHandle<'a> {
store: &'a Addr<Store>,
objectstore: Option<&'a Addr<Objectstore>>,
global_config: &'a GlobalConfig,
}

#[cfg(feature = "processing")]
impl<'a> StoreHandle<'a> {
pub fn new(store: &'a Addr<Store>, objectstore: Option<&'a Addr<Objectstore>>) -> Self {
Self { store, objectstore }
pub fn new(
store: &'a Addr<Store>,
objectstore: Option<&'a Addr<Objectstore>>,
global_config: &'a GlobalConfig,
) -> Self {
Self {
store,
objectstore,
global_config,
}
}

/// Sends a message to the [`Store`] service.
Expand All @@ -45,6 +56,36 @@ impl<'a> StoreHandle<'a> {
relay_log::error!("Objectstore service not configured. Dropping message.");
}
}

/// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service.
pub fn send_envelope(&self, envelope: ManagedEnvelope) {
use crate::services::store::StoreEnvelope;

let Some(objectstore) = self.objectstore else {
self.store.send(StoreEnvelope { envelope });
return;
};

let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &crate::envelope::ItemType::Attachment);

let use_objectstore = || {
crate::utils::sample(
self.global_config
.options
.objectstore_attachments_sample_rate,
)
.is_keep()
};

if has_attachments && use_objectstore() {
objectstore.send(StoreEnvelope { envelope })
} else {
self.store.send(StoreEnvelope { envelope });
}
}
}

/// A processor output which can be forwarded to a different destination.
Expand Down Expand Up @@ -73,7 +114,7 @@ pub struct ForwardContext<'a> {
/// The Relay configuration.
pub config: &'a Config,
/// A view of the currently active global configuration.
#[cfg_attr(not(feature = "processing"), expect(unused))]
#[expect(unused, reason = "not yet used")]
pub global_config: &'a GlobalConfig,
/// Project configuration associated with the unit of work.
pub project_info: &'a ProjectInfo,
Expand Down
22 changes: 1 addition & 21 deletions relay-server/src/processing/transactions/types/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use relay_protocol::Annotated;
use relay_quotas::DataCategory;

use crate::Envelope;
#[cfg(feature = "processing")]
use crate::managed::ManagedEnvelope;
use crate::managed::{Managed, ManagedResult, Rejected};
#[cfg(feature = "processing")]
use crate::processing::StoreHandle;
Expand Down Expand Up @@ -72,26 +70,8 @@ impl Forward for TransactionOutput {
s: StoreHandle<'_>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::envelope::ItemType;

let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();

let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &ItemType::Attachment);
let use_objectstore = || {
let options = &ctx.global_config.options;
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if has_attachments && use_objectstore() {
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
} else {
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
}

s.send_envelope(crate::managed::ManagedEnvelope::from(envelope));
Ok(())
}
}
5 changes: 2 additions & 3 deletions relay-server/src/services/objectstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ use uuid::Uuid;
use crate::constants::DEFAULT_ATTACHMENT_RETENTION;
use crate::envelope::ItemType;
use crate::managed::{
Counted, Managed, ManagedResult, OutcomeError, Quantities, Rejected, TypedEnvelope,
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
};
use crate::processing::utils::store::item_id_to_uuid;
use crate::services::outcome::DiscardReason;
use crate::services::processor::Processed;
use crate::services::store::{Store, StoreEnvelope, StoreTraceItem};
use crate::services::upload;
use crate::statsd::{RelayCounters, RelayTimers};
Expand Down Expand Up @@ -251,7 +250,7 @@ impl ObjectstoreServiceInner {
///
/// This mutates the attachment items in-place, setting their `stored_key` field to the key
/// in objectstore.
async fn handle_envelope(&self, mut envelope: TypedEnvelope<Processed>) -> () {
async fn handle_envelope(&self, mut envelope: ManagedEnvelope) -> () {
let scoping = envelope.scoping();
let session = self
.event_attachments
Expand Down
34 changes: 11 additions & 23 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ use relay_threading::AsyncPool;
#[cfg(feature = "processing")]
use {
crate::services::objectstore::Objectstore,
crate::services::store::{Store, StoreEnvelope},
crate::services::store::Store,
crate::utils::Enforcement,
itertools::Itertools,
relay_cardinality::{
Expand Down Expand Up @@ -2039,30 +2039,18 @@ impl EnvelopeProcessorService {
use crate::processing::StoreHandle;

let objectstore = self.inner.addrs.objectstore.as_ref();
let global_config = &self.inner.global_config.current();
let handle = StoreHandle::new(store_forwarder, objectstore, global_config);

match submit {
Submit::Envelope(envelope) => {
let envelope_has_attachments = envelope
.envelope()
.items()
.any(|item| *item.ty() == ItemType::Attachment);
// Whether Relay will store this attachment in objectstore or use kafka like before.
let use_objectstore = || {
let options = &self.inner.global_config.current().options;
utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if let Some(objectstore) = &self.inner.addrs.objectstore
&& envelope_has_attachments
&& use_objectstore()
{
// the `ObjectstoreService` will upload all attachments, and then forward the envelope to the `StoreService`.
objectstore.send(StoreEnvelope { envelope })
} else {
store_forwarder.send(StoreEnvelope { envelope })
}
}
// Once check-ins and errors are fully moved to the new pipeline, this is only
// used for metrics forwarding.
//
// Metrics forwarding will n_never_ forward an envelope in processing, making
// this branch here unused.
Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
Submit::Output { output, ctx } => output
.forward_store(StoreHandle::new(store_forwarder, objectstore), ctx)
.forward_store(handle, ctx)
.unwrap_or_else(|err| err.into_inner()),
}
return;
Expand Down
5 changes: 2 additions & 3 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use relay_threading::AsyncPool;

use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::{self, FormDataIter};

Expand Down Expand Up @@ -94,7 +93,7 @@ impl Producer {
/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
#[derive(Debug)]
pub struct StoreEnvelope {
pub envelope: TypedEnvelope<Processed>,
pub envelope: ManagedEnvelope,
}

/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
Expand Down