Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions relay-server/src/managed/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl<G> TypedEnvelope<G> {
self.0.accept()
}

/// Returns the raw [`ManagedEnvelope`].
pub fn into_inner(self) -> ManagedEnvelope {
self.0
}

/// Creates a new typed envelope.
///
/// Note: this method is private to make sure that only `TryFrom` implementation is used, which
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/check_ins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Forward for CheckInsOutput {
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();
let envelope = ManagedEnvelope::from(envelope);

s.send_to_store(crate::services::store::StoreEnvelope { envelope });

Expand Down
18 changes: 1 addition & 17 deletions relay-server/src/processing/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,23 +344,7 @@ impl Forward for ErrorOutput {
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();

let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &ItemType::Attachment);
let use_objectstore = || {
let options = &ctx.global_config.options;
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if has_attachments && use_objectstore() {
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
} else {
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
}

s.send_envelope(ManagedEnvelope::from(envelope));
Ok(())
}
}
45 changes: 43 additions & 2 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use relay_dynamic_config::{RetentionConfig, RetentionsConfig};
use relay_system::{Addr, FromMessage};

use crate::Envelope;
#[cfg(feature = "processing")]
use crate::managed::ManagedEnvelope;
use crate::managed::{Managed, Rejected};
#[cfg(feature = "processing")]
use crate::services::objectstore::Objectstore;
Expand All @@ -18,12 +20,21 @@ use crate::services::store::Store;
pub struct StoreHandle<'a> {
store: &'a Addr<Store>,
objectstore: Option<&'a Addr<Objectstore>>,
global_config: &'a GlobalConfig,
}

#[cfg(feature = "processing")]
impl<'a> StoreHandle<'a> {
pub fn new(store: &'a Addr<Store>, objectstore: Option<&'a Addr<Objectstore>>) -> Self {
Self { store, objectstore }
pub fn new(
store: &'a Addr<Store>,
objectstore: Option<&'a Addr<Objectstore>>,
global_config: &'a GlobalConfig,
) -> Self {
Self {
store,
objectstore,
global_config,
}
}

/// Sends a message to the [`Store`] service.
Expand All @@ -45,6 +56,36 @@ impl<'a> StoreHandle<'a> {
relay_log::error!("Objectstore service not configured. Dropping message.");
}
}

/// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service.
pub fn send_envelope(&self, envelope: ManagedEnvelope) {
use crate::services::store::StoreEnvelope;

let Some(objectstore) = self.objectstore else {
self.store.send(StoreEnvelope { envelope });
return;
};

let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &crate::envelope::ItemType::Attachment);

let use_objectstore = || {
crate::utils::sample(
self.global_config
.options
.objectstore_attachments_sample_rate,
)
.is_keep()
};

if has_attachments && use_objectstore() {
objectstore.send(StoreEnvelope { envelope })
} else {
self.store.send(StoreEnvelope { envelope });
}
}
}

/// A processor output which can be forwarded to a different destination.
Expand Down
22 changes: 1 addition & 21 deletions relay-server/src/processing/transactions/types/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use relay_protocol::Annotated;
use relay_quotas::DataCategory;

use crate::Envelope;
#[cfg(feature = "processing")]
use crate::managed::ManagedEnvelope;
use crate::managed::{Managed, ManagedResult, Rejected};
#[cfg(feature = "processing")]
use crate::processing::StoreHandle;
Expand Down Expand Up @@ -72,26 +70,8 @@ impl Forward for TransactionOutput {
s: StoreHandle<'_>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::envelope::ItemType;

let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();

let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &ItemType::Attachment);
let use_objectstore = || {
let options = &ctx.global_config.options;
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if has_attachments && use_objectstore() {
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
} else {
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
}

s.send_envelope(crate::managed::ManagedEnvelope::from(envelope));
Ok(())
}
}
5 changes: 2 additions & 3 deletions relay-server/src/services/objectstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ use uuid::Uuid;
use crate::constants::DEFAULT_ATTACHMENT_RETENTION;
use crate::envelope::ItemType;
use crate::managed::{
Counted, Managed, ManagedResult, OutcomeError, Quantities, Rejected, TypedEnvelope,
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
};
use crate::processing::utils::store::item_id_to_uuid;
use crate::services::outcome::DiscardReason;
use crate::services::processor::Processed;
use crate::services::store::{Store, StoreAttachment, StoreEnvelope, StoreTraceItem};
use crate::services::upload;
use crate::statsd::{RelayCounters, RelayTimers};
Expand Down Expand Up @@ -271,7 +270,7 @@ impl ObjectstoreServiceInner {
///
/// This mutates the attachment items in-place, setting their `stored_key` field to the key
/// in objectstore.
async fn handle_envelope(&self, mut envelope: TypedEnvelope<Processed>) -> () {
async fn handle_envelope(&self, mut envelope: ManagedEnvelope) -> () {
let scoping = envelope.scoping();
let session = self
.event_attachments
Expand Down
34 changes: 11 additions & 23 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use relay_threading::AsyncPool;
#[cfg(feature = "processing")]
use {
crate::services::objectstore::Objectstore,
crate::services::store::{Store, StoreEnvelope},
crate::services::store::Store,
crate::utils::Enforcement,
itertools::Itertools,
relay_cardinality::{
Expand Down Expand Up @@ -2064,30 +2064,18 @@ impl EnvelopeProcessorService {
use crate::processing::StoreHandle;

let objectstore = self.inner.addrs.objectstore.as_ref();
let global_config = &self.inner.global_config.current();
let handle = StoreHandle::new(store_forwarder, objectstore, global_config);

match submit {
Submit::Envelope(envelope) => {
let envelope_has_attachments = envelope
.envelope()
.items()
.any(|item| *item.ty() == ItemType::Attachment);
// Whether Relay will store this attachment in objectstore or use kafka like before.
let use_objectstore = || {
let options = &self.inner.global_config.current().options;
utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if let Some(objectstore) = &self.inner.addrs.objectstore
&& envelope_has_attachments
&& use_objectstore()
{
// the `ObjectstoreService` will upload all attachments, and then forward the envelope to the `StoreService`.
objectstore.send(StoreEnvelope { envelope })
} else {
store_forwarder.send(StoreEnvelope { envelope })
}
}
// Once check-ins and errors are fully moved to the new pipeline, this is only
// used for metrics forwarding.
//
// Metrics forwarding will n_never_ forward an envelope in processing, making
// this branch here unused.
Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
Submit::Output { output, ctx } => output
.forward_store(StoreHandle::new(store_forwarder, objectstore), ctx)
.forward_store(handle, ctx)
.unwrap_or_else(|err| err.into_inner()),
}
return;
Expand Down
5 changes: 2 additions & 3 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use relay_threading::AsyncPool;

use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::{self, FormDataIter};

Expand Down Expand Up @@ -94,7 +93,7 @@ impl Producer {
/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
#[derive(Debug)]
pub struct StoreEnvelope {
pub envelope: TypedEnvelope<Processed>,
pub envelope: ManagedEnvelope,
}

/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
Expand Down
Loading