Skip to content

Commit 0e93566

Browse files
feat(relay): Simplify Proxy Mode (#5165)
This is a big change, however this should not break anything. Users that run Relay in Proxy mode should not experience any negative side effects. With the recent changes to the Relay modes (#5108, #5057, #5053) it became apparent that the proxy mode is currently doing more than strictly necessary. That is, the logic is currently more complex than it needs to be and as such it also is less efficient than it could be. This PR tries to rectify this by making Proxy mode more of a 'pure' Proxy by introducing the `ProxyProcessorService` which is a simplified version of the `EnvelopeProcessorService`. There are also some changes to the `HealthCheckService`, `AutoscalingMetricService` and `RelayStats` to make these compatible. Fixes: https://linear.app/getsentry/issue/INGEST-581/simplify-proxy-mode
1 parent 5fa7b8e commit 0e93566

File tree

11 files changed

+454
-71
lines changed

11 files changed

+454
-71
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## Unreleased
44

5+
**Breaking Changes**:
6+
7+
- Simplify proxy mode to forward without processing. ([#5165](https://github.com/getsentry/relay/pull/5165))
8+
59
**Bug Fixes**:
610

711
- Make referer optional in Vercel Log Drain Transform. ([#5273](https://github.com/getsentry/relay/pull/5273))

relay-server/src/endpoints/autoscaling.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ use std::fmt::Write;
66

77
/// Returns internal metrics data for relay.
88
pub async fn handle(state: ServiceState) -> (StatusCode, String) {
9-
let data = match state
10-
.autoscaling()
11-
.send(AutoscalingMessageKind::Check)
12-
.await
13-
{
9+
let Some(autoscaling) = state.autoscaling() else {
10+
return (
11+
StatusCode::NOT_FOUND,
12+
"Autoscaling metrics not enabled".to_owned(),
13+
);
14+
};
15+
16+
let data = match autoscaling.send(AutoscalingMessageKind::Check).await {
1417
Ok(data) => data,
1518
Err(_) => {
1619
return (

relay-server/src/service.rs

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::services::processor::{
2020
};
2121
use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
2222
use crate::services::projects::source::ProjectSource;
23+
use crate::services::proxy_processor::{ProxyAddrs, ProxyProcessorService};
2324
use crate::services::relays::{RelayCache, RelayCacheService};
2425
use crate::services::stats::RelayStats;
2526
#[cfg(feature = "processing")]
@@ -72,7 +73,7 @@ pub struct Registry {
7273
pub upstream_relay: Addr<UpstreamRelay>,
7374
pub envelope_buffer: PartitionedEnvelopeBuffer,
7475
pub project_cache_handle: ProjectCacheHandle,
75-
pub autoscaling: Addr<AutoscalingMetrics>,
76+
pub autoscaling: Option<Addr<AutoscalingMetrics>>,
7677
}
7778

7879
/// Constructs a Tokio [`relay_system::Runtime`] configured for running [services](relay_system::Service).
@@ -181,7 +182,11 @@ impl ServiceState {
181182

182183
// Create an address for the `EnvelopeProcessor`, which can be injected into the
183184
// other services.
184-
let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
185+
let (processor, processor_rx) = match config.relay_mode() {
186+
relay_config::RelayMode::Proxy => channel(ProxyProcessorService::name()),
187+
relay_config::RelayMode::Managed => channel(EnvelopeProcessorService::name()),
188+
};
189+
185190
let outcome_producer = services.start(OutcomeProducerService::create(
186191
config.clone(),
187192
upstream_relay.clone(),
@@ -209,16 +214,6 @@ impl ServiceState {
209214
let project_cache_handle =
210215
ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
211216

212-
let aggregator = RouterService::new(
213-
handle.clone(),
214-
config.default_aggregator_config().clone(),
215-
config.secondary_aggregator_configs().clone(),
216-
Some(processor.clone().recipient()),
217-
project_cache_handle.clone(),
218-
);
219-
let aggregator_handle = aggregator.handle();
220-
let aggregator = services.start(aggregator);
221-
222217
let metric_outcomes = MetricOutcomes::new(outcome_aggregator.clone());
223218

224219
#[cfg(feature = "processing")]
@@ -238,38 +233,11 @@ impl ServiceState {
238233
})
239234
.transpose()?;
240235

241-
let cogs = CogsService::new(&config);
242-
let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
243-
244236
#[cfg(feature = "processing")]
245237
let global_rate_limits = redis_clients
246238
.as_ref()
247239
.map(|p| services.start(GlobalRateLimitsService::new(p.quotas.clone())));
248240

249-
let processor_pool = create_processor_pool(&config)?;
250-
services.start_with(
251-
EnvelopeProcessorService::new(
252-
processor_pool.clone(),
253-
config.clone(),
254-
global_config_handle,
255-
project_cache_handle.clone(),
256-
cogs,
257-
#[cfg(feature = "processing")]
258-
redis_clients.clone(),
259-
processor::Addrs {
260-
outcome_aggregator: outcome_aggregator.clone(),
261-
upstream_relay: upstream_relay.clone(),
262-
#[cfg(feature = "processing")]
263-
store_forwarder: store.clone(),
264-
aggregator: aggregator.clone(),
265-
#[cfg(feature = "processing")]
266-
global_rate_limits,
267-
},
268-
metric_outcomes.clone(),
269-
),
270-
processor_rx,
271-
);
272-
273241
let envelope_buffer = PartitionedEnvelopeBuffer::create(
274242
config.spool_partitions(),
275243
config.clone(),
@@ -281,6 +249,75 @@ impl ServiceState {
281249
services,
282250
);
283251

252+
let (processor_pool, aggregator_handle, autoscaling) = match config.relay_mode() {
253+
relay_config::RelayMode::Proxy => {
254+
services.start_with(
255+
ProxyProcessorService::new(
256+
config.clone(),
257+
project_cache_handle.clone(),
258+
ProxyAddrs {
259+
outcome_aggregator: outcome_aggregator.clone(),
260+
upstream_relay: upstream_relay.clone(),
261+
},
262+
),
263+
processor_rx,
264+
);
265+
(None, None, None)
266+
}
267+
relay_config::RelayMode::Managed => {
268+
let processor_pool = create_processor_pool(&config)?;
269+
270+
let aggregator = RouterService::new(
271+
handle.clone(),
272+
config.default_aggregator_config().clone(),
273+
config.secondary_aggregator_configs().clone(),
274+
Some(processor.clone().recipient()),
275+
project_cache_handle.clone(),
276+
);
277+
let aggregator_handle = aggregator.handle();
278+
let aggregator = services.start(aggregator);
279+
280+
let cogs = CogsService::new(&config);
281+
let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
282+
283+
services.start_with(
284+
EnvelopeProcessorService::new(
285+
processor_pool.clone(),
286+
config.clone(),
287+
global_config_handle,
288+
project_cache_handle.clone(),
289+
cogs,
290+
#[cfg(feature = "processing")]
291+
redis_clients.clone(),
292+
processor::Addrs {
293+
outcome_aggregator: outcome_aggregator.clone(),
294+
upstream_relay: upstream_relay.clone(),
295+
#[cfg(feature = "processing")]
296+
store_forwarder: store.clone(),
297+
aggregator: aggregator.clone(),
298+
#[cfg(feature = "processing")]
299+
global_rate_limits,
300+
},
301+
metric_outcomes.clone(),
302+
),
303+
processor_rx,
304+
);
305+
306+
let autoscaling = services.start(AutoscalingMetricService::new(
307+
memory_stat.clone(),
308+
envelope_buffer.clone(),
309+
handle.clone(),
310+
processor_pool.clone(),
311+
));
312+
313+
(
314+
Some(processor_pool),
315+
Some(aggregator_handle),
316+
Some(autoscaling),
317+
)
318+
}
319+
};
320+
284321
let health_check = services.start(HealthCheckService::new(
285322
config.clone(),
286323
MemoryChecker::new(memory_stat.clone(), config.clone()),
@@ -289,13 +326,6 @@ impl ServiceState {
289326
envelope_buffer.clone(),
290327
));
291328

292-
let autoscaling = services.start(AutoscalingMetricService::new(
293-
memory_stat.clone(),
294-
envelope_buffer.clone(),
295-
handle.clone(),
296-
processor_pool.clone(),
297-
));
298-
299329
services.start(RelayStats::new(
300330
config.clone(),
301331
handle.clone(),
@@ -348,8 +378,8 @@ impl ServiceState {
348378
&self.inner.memory_checker
349379
}
350380

351-
pub fn autoscaling(&self) -> &Addr<AutoscalingMetrics> {
352-
&self.inner.registry.autoscaling
381+
pub fn autoscaling(&self) -> Option<&Addr<AutoscalingMetrics>> {
382+
self.inner.registry.autoscaling.as_ref()
353383
}
354384

355385
/// Returns the V2 envelope buffer, if present.

relay-server/src/services/health_check.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl StatusUpdate {
8484
pub struct HealthCheckService {
8585
config: Arc<Config>,
8686
memory_checker: MemoryChecker,
87-
aggregator: RouterHandle,
87+
aggregator: Option<RouterHandle>,
8888
upstream_relay: Addr<UpstreamRelay>,
8989
envelope_buffer: PartitionedEnvelopeBuffer,
9090
}
@@ -94,7 +94,7 @@ impl HealthCheckService {
9494
pub fn new(
9595
config: Arc<Config>,
9696
memory_checker: MemoryChecker,
97-
aggregator: RouterHandle,
97+
aggregator: Option<RouterHandle>,
9898
upstream_relay: Addr<UpstreamRelay>,
9999
envelope_buffer: PartitionedEnvelopeBuffer,
100100
) -> Self {
@@ -145,7 +145,10 @@ impl HealthCheckService {
145145
}
146146

147147
async fn aggregator_probe(&self) -> Status {
148-
Status::from(self.aggregator.can_accept_metrics())
148+
self.aggregator
149+
.as_ref()
150+
.map(|agg| Status::from(agg.can_accept_metrics()))
151+
.unwrap_or(Status::Healthy)
149152
}
150153

151154
async fn spool_health_probe(&self) -> Status {

relay-server/src/services/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub mod outcome;
3838
pub mod outcome_aggregator;
3939
pub mod processor;
4040
pub mod projects;
41+
pub mod proxy_processor;
4142
pub mod relays;
4243
pub mod server;
4344
pub mod stats;

relay-server/src/services/processor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3358,7 +3358,7 @@ impl RateLimiter {
33583358
}
33593359
}
33603360

3361-
fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3361+
pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
33623362
let envelope_body: Vec<u8> = match http_encoding {
33633363
HttpEncoding::Identity => return Ok(body.clone()),
33643364
HttpEncoding::Deflate => {
@@ -3392,10 +3392,10 @@ fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, st
33923392
/// An upstream request that submits an envelope via HTTP.
33933393
#[derive(Debug)]
33943394
pub struct SendEnvelope {
3395-
envelope: TypedEnvelope<Processed>,
3396-
body: Bytes,
3397-
http_encoding: HttpEncoding,
3398-
project_cache: ProjectCacheHandle,
3395+
pub envelope: TypedEnvelope<Processed>,
3396+
pub body: Bytes,
3397+
pub http_encoding: HttpEncoding,
3398+
pub project_cache: ProjectCacheHandle,
33993399
}
34003400

34013401
impl UpstreamRequest for SendEnvelope {

0 commit comments

Comments
 (0)