Skip to content

Commit ac46fb7

Browse files
committed
move util fn to store handle
1 parent aa77b04 commit ac46fb7

File tree

9 files changed

+67
-57
lines changed

9 files changed

+67
-57
lines changed

relay-server/src/managed/envelope.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ impl<G> TypedEnvelope<G> {
9999
self.0.accept()
100100
}
101101

102+
/// Returns the raw [`ManagedEnvelope`].
103+
pub fn into_inner(self) -> ManagedEnvelope {
104+
self.0
105+
}
106+
102107
/// Creates a new typed envelope.
103108
///
104109
/// Note: this method is private to make sure that only `TryFrom` implementation is used, which

relay-server/src/processing/check_ins/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl Forward for CheckInsOutput {
116116
ctx: processing::ForwardContext<'_>,
117117
) -> Result<(), Rejected<()>> {
118118
let envelope = self.serialize_envelope(ctx)?;
119-
let envelope = ManagedEnvelope::from(envelope).into_processed();
119+
let envelope = ManagedEnvelope::from(envelope);
120120

121121
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
122122

relay-server/src/processing/errors/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,7 @@ impl Forward for ErrorOutput {
344344
ctx: processing::ForwardContext<'_>,
345345
) -> Result<(), Rejected<()>> {
346346
let envelope = self.serialize_envelope(ctx)?;
347-
let envelope = ManagedEnvelope::from(envelope).into_processed();
348-
processing::utils::store::forward_envelope(envelope, s, ctx.global_config);
347+
s.send_envelope(ManagedEnvelope::from(envelope));
349348
Ok(())
350349
}
351350
}

relay-server/src/processing/forward.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use relay_dynamic_config::{RetentionConfig, RetentionsConfig};
55
use relay_system::{Addr, FromMessage};
66

77
use crate::Envelope;
8+
#[cfg(feature = "processing")]
9+
use crate::managed::ManagedEnvelope;
810
use crate::managed::{Managed, Rejected};
911
#[cfg(feature = "processing")]
1012
use crate::services::objectstore::Objectstore;
@@ -18,12 +20,21 @@ use crate::services::store::Store;
1820
pub struct StoreHandle<'a> {
1921
store: &'a Addr<Store>,
2022
objectstore: Option<&'a Addr<Objectstore>>,
23+
global_config: &'a GlobalConfig,
2124
}
2225

2326
#[cfg(feature = "processing")]
2427
impl<'a> StoreHandle<'a> {
25-
pub fn new(store: &'a Addr<Store>, objectstore: Option<&'a Addr<Objectstore>>) -> Self {
26-
Self { store, objectstore }
28+
pub fn new(
29+
store: &'a Addr<Store>,
30+
objectstore: Option<&'a Addr<Objectstore>>,
31+
global_config: &'a GlobalConfig,
32+
) -> Self {
33+
Self {
34+
store,
35+
objectstore,
36+
global_config,
37+
}
2738
}
2839

2940
/// Sends a message to the [`Store`] service.
@@ -45,6 +56,36 @@ impl<'a> StoreHandle<'a> {
4556
relay_log::error!("Objectstore service not configured. Dropping message.");
4657
}
4758
}
59+
60+
/// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service.
61+
pub fn send_envelope(&self, envelope: ManagedEnvelope) {
62+
use crate::services::store::StoreEnvelope;
63+
64+
let Some(objectstore) = self.objectstore else {
65+
self.store.send(StoreEnvelope { envelope });
66+
return;
67+
};
68+
69+
let has_attachments = envelope
70+
.envelope()
71+
.items()
72+
.any(|item| item.ty() == &crate::envelope::ItemType::Attachment);
73+
74+
let use_objectstore = || {
75+
crate::utils::sample(
76+
self.global_config
77+
.options
78+
.objectstore_attachments_sample_rate,
79+
)
80+
.is_keep()
81+
};
82+
83+
if has_attachments && use_objectstore() {
84+
objectstore.send(StoreEnvelope { envelope })
85+
} else {
86+
self.store.send(StoreEnvelope { envelope });
87+
}
88+
}
4889
}
4990

5091
/// A processor output which can be forwarded to a different destination.
@@ -73,7 +114,7 @@ pub struct ForwardContext<'a> {
73114
/// The Relay configuration.
74115
pub config: &'a Config,
75116
/// A view of the currently active global configuration.
76-
#[cfg_attr(not(feature = "processing"), expect(unused))]
117+
#[expect(unused, reason = "not yet used")]
77118
pub global_config: &'a GlobalConfig,
78119
/// Project configuration associated with the unit of work.
79120
pub project_info: &'a ProjectInfo,

relay-server/src/processing/transactions/types/output.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use relay_quotas::DataCategory;
66

77
use crate::Envelope;
88
use crate::managed::{Managed, ManagedResult, Rejected};
9+
#[cfg(feature = "processing")]
10+
use crate::processing::StoreHandle;
911
use crate::processing::spans::Indexed;
1012
use crate::processing::transactions::types::{ExpandedTransaction, StandaloneProfile};
11-
#[cfg(feature = "processing")]
12-
use crate::processing::{self, StoreHandle};
1313
use crate::processing::{Forward, ForwardContext};
1414
use crate::services::outcome::{DiscardReason, Outcome};
1515

@@ -71,8 +71,7 @@ impl Forward for TransactionOutput {
7171
ctx: ForwardContext<'_>,
7272
) -> Result<(), Rejected<()>> {
7373
let envelope = self.serialize_envelope(ctx)?;
74-
let envelope = crate::managed::ManagedEnvelope::from(envelope).into_processed();
75-
processing::utils::store::forward_envelope(envelope, s, ctx.global_config);
74+
s.send_envelope(crate::managed::ManagedEnvelope::from(envelope));
7675
Ok(())
7776
}
7877
}

relay-server/src/processing/utils/store.rs

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::collections::HashMap;
33

44
use chrono::Utc;
55
use relay_conventions::CLIENT_SAMPLE_RATE;
6-
use relay_dynamic_config::GlobalConfig;
76
use relay_event_schema::protocol::Attributes;
87
use relay_protocol::{Annotated, IntoValue, MetaTree, Value};
98

@@ -12,10 +11,7 @@ use sentry_protos::snuba::v1::{AnyValue, ArrayValue, CategoryCount, Outcomes, an
1211
use serde::Serialize;
1312
use uuid::Uuid;
1413

15-
use crate::envelope::ItemType;
16-
use crate::managed::{Quantities, TypedEnvelope};
17-
use crate::processing;
18-
use crate::services::processor::Processed;
14+
use crate::managed::Quantities;
1915

2016
/// Represents metadata extracted from Relay's annotated model.
2117
///
@@ -244,29 +240,3 @@ pub fn quantities_to_trace_item_outcomes(q: Quantities, scoping: Scoping) -> Out
244240
key_id: scoping.key_id.unwrap_or(0),
245241
}
246242
}
247-
248-
/// Helper which forwards envelopes to be stored.
249-
///
250-
/// This manages the dispatch logic between object store and Kafka. This is a temporary measure
251-
/// until objectstore is available and used in all environments, including Self Hosted.
252-
pub fn forward_envelope(
253-
envelope: TypedEnvelope<Processed>,
254-
s: processing::StoreHandle<'_>,
255-
global_config: &GlobalConfig,
256-
) {
257-
let has_attachments = envelope
258-
.envelope()
259-
.items()
260-
.any(|item| item.ty() == &ItemType::Attachment);
261-
262-
let use_objectstore = || {
263-
crate::utils::sample(global_config.options.objectstore_attachments_sample_rate).is_keep()
264-
};
265-
266-
let message = crate::services::store::StoreEnvelope { envelope };
267-
if has_attachments && use_objectstore() {
268-
s.send_to_objectstore(message);
269-
} else {
270-
s.send_to_store(message);
271-
}
272-
}

relay-server/src/services/objectstore.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@ use uuid::Uuid;
1717
use crate::constants::DEFAULT_ATTACHMENT_RETENTION;
1818
use crate::envelope::ItemType;
1919
use crate::managed::{
20-
Counted, Managed, ManagedResult, OutcomeError, Quantities, Rejected, TypedEnvelope,
20+
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
2121
};
2222
use crate::processing::utils::store::item_id_to_uuid;
2323
use crate::services::outcome::DiscardReason;
24-
use crate::services::processor::Processed;
2524
use crate::services::store::{Store, StoreEnvelope, StoreTraceItem};
2625
use crate::services::upload;
2726
use crate::statsd::{RelayCounters, RelayTimers};
@@ -251,7 +250,7 @@ impl ObjectstoreServiceInner {
251250
///
252251
/// This mutates the attachment items in-place, setting their `stored_key` field to the key
253252
/// in objectstore.
254-
async fn handle_envelope(&self, mut envelope: TypedEnvelope<Processed>) -> () {
253+
async fn handle_envelope(&self, mut envelope: ManagedEnvelope) -> () {
255254
let scoping = envelope.scoping();
256255
let session = self
257256
.event_attachments

relay-server/src/services/processor.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2039,18 +2039,16 @@ impl EnvelopeProcessorService {
20392039
use crate::processing::StoreHandle;
20402040

20412041
let objectstore = self.inner.addrs.objectstore.as_ref();
2042-
let handle = StoreHandle::new(store_forwarder, objectstore);
2042+
let global_config = &self.inner.global_config.current();
2043+
let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
20432044

20442045
match submit {
2045-
Submit::Envelope(envelope) => {
2046-
let global_config = &self.inner.global_config.current();
2047-
// Once check-ins and errors are fully moved to the new pipeline, this is only
2048-
// used for metrics forwarding.
2049-
//
2050-
// Metrics forwarding will n_never_ forward an envelope in processing, making
2051-
// this branch here unused.
2052-
processing::utils::store::forward_envelope(envelope, handle, global_config);
2053-
}
2046+
// Once check-ins and errors are fully moved to the new pipeline, this is only
2047+
// used for metrics forwarding.
2048+
//
2049+
// Metrics forwarding will n_never_ forward an envelope in processing, making
2050+
// this branch here unused.
2051+
Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
20542052
Submit::Output { output, ctx } => output
20552053
.forward_store(handle, ctx)
20562054
.unwrap_or_else(|err| err.into_inner()),

relay-server/src/services/store.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,11 @@ use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
3434
use relay_threading::AsyncPool;
3535

3636
use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37-
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
37+
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
3838
use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
3939
use crate::service::ServiceError;
4040
use crate::services::global_config::GlobalConfigHandle;
4141
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42-
use crate::services::processor::Processed;
4342
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
4443
use crate::utils::{self, FormDataIter};
4544

@@ -94,7 +93,7 @@ impl Producer {
9493
/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
9594
#[derive(Debug)]
9695
pub struct StoreEnvelope {
97-
pub envelope: TypedEnvelope<Processed>,
96+
pub envelope: ManagedEnvelope,
9897
}
9998

10099
/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.

0 commit comments

Comments
 (0)