Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions relay-server/src/processing/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,22 +345,7 @@ impl Forward for ErrorOutput {
) -> Result<(), Rejected<()>> {
let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();

let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &ItemType::Attachment);
let use_objectstore = || {
let options = &ctx.global_config.options;
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if has_attachments && use_objectstore() {
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
} else {
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
}

processing::utils::store::forward_envelope(envelope, s, ctx.global_config);
Ok(())
}
}
27 changes: 4 additions & 23 deletions relay-server/src/processing/transactions/types/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ use relay_protocol::Annotated;
use relay_quotas::DataCategory;

use crate::Envelope;
#[cfg(feature = "processing")]
use crate::managed::ManagedEnvelope;
use crate::managed::{Managed, ManagedResult, Rejected};
#[cfg(feature = "processing")]
use crate::processing::StoreHandle;
use crate::processing::spans::Indexed;
use crate::processing::transactions::types::{ExpandedTransaction, StandaloneProfile};
#[cfg(feature = "processing")]
use crate::processing::{self, StoreHandle};
use crate::processing::{Forward, ForwardContext};
use crate::services::outcome::{DiscardReason, Outcome};

Expand Down Expand Up @@ -72,26 +70,9 @@ impl Forward for TransactionOutput {
s: StoreHandle<'_>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::envelope::ItemType;

let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();

let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &ItemType::Attachment);
let use_objectstore = || {
let options = &ctx.global_config.options;
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if has_attachments && use_objectstore() {
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
} else {
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
}

let envelope = crate::managed::ManagedEnvelope::from(envelope).into_processed();
processing::utils::store::forward_envelope(envelope, s, ctx.global_config);
Ok(())
}
}
32 changes: 31 additions & 1 deletion relay-server/src/processing/utils/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;

use chrono::Utc;
use relay_conventions::CLIENT_SAMPLE_RATE;
use relay_dynamic_config::GlobalConfig;
use relay_event_schema::protocol::Attributes;
use relay_protocol::{Annotated, IntoValue, MetaTree, Value};

Expand All @@ -11,7 +12,10 @@ use sentry_protos::snuba::v1::{AnyValue, ArrayValue, CategoryCount, Outcomes, an
use serde::Serialize;
use uuid::Uuid;

use crate::managed::Quantities;
use crate::envelope::ItemType;
use crate::managed::{Quantities, TypedEnvelope};
use crate::processing;
use crate::services::processor::Processed;

/// Represents metadata extracted from Relay's annotated model.
///
Expand Down Expand Up @@ -240,3 +244,29 @@ pub fn quantities_to_trace_item_outcomes(q: Quantities, scoping: Scoping) -> Out
key_id: scoping.key_id.unwrap_or(0),
}
}

/// Helper which forwards envelopes to be stored.
///
/// This manages the dispatch logic between object store and Kafka. This is a temporary measure
/// until objectstore is available and used in all environments, including Self Hosted.
pub fn forward_envelope(
envelope: TypedEnvelope<Processed>,
s: processing::StoreHandle<'_>,
global_config: &GlobalConfig,
) {
let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &ItemType::Attachment);

let use_objectstore = || {
crate::utils::sample(global_config.options.objectstore_attachments_sample_rate).is_keep()
};

let message = crate::services::store::StoreEnvelope { envelope };
if has_attachments && use_objectstore() {
s.send_to_objectstore(message);
} else {
s.send_to_store(message);
}
}
32 changes: 11 additions & 21 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ use relay_threading::AsyncPool;
#[cfg(feature = "processing")]
use {
crate::services::objectstore::Objectstore,
crate::services::store::{Store, StoreEnvelope},
crate::services::store::Store,
crate::utils::Enforcement,
itertools::Itertools,
relay_cardinality::{
Expand Down Expand Up @@ -2039,30 +2039,20 @@ impl EnvelopeProcessorService {
use crate::processing::StoreHandle;

let objectstore = self.inner.addrs.objectstore.as_ref();
let handle = StoreHandle::new(store_forwarder, objectstore);

match submit {
Submit::Envelope(envelope) => {
let envelope_has_attachments = envelope
.envelope()
.items()
.any(|item| *item.ty() == ItemType::Attachment);
// Whether Relay will store this attachment in objectstore or use kafka like before.
let use_objectstore = || {
let options = &self.inner.global_config.current().options;
utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if let Some(objectstore) = &self.inner.addrs.objectstore
&& envelope_has_attachments
&& use_objectstore()
{
// the `ObjectstoreService` will upload all attachments, and then forward the envelope to the `StoreService`.
objectstore.send(StoreEnvelope { envelope })
} else {
store_forwarder.send(StoreEnvelope { envelope })
}
let global_config = &self.inner.global_config.current();
// Once check-ins and errors are fully moved to the new pipeline, this is only
// used for metrics forwarding.
//
// Metrics forwarding will n_never_ forward an envelope in processing, making
// this branch here unused.
processing::utils::store::forward_envelope(envelope, handle, global_config);
}
Submit::Output { output, ctx } => output
.forward_store(StoreHandle::new(store_forwarder, objectstore), ctx)
.forward_store(handle, ctx)
.unwrap_or_else(|err| err.into_inner()),
}
return;
Expand Down
Loading