Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions relay-dynamic-config/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct ProjectConfig {
/// Retention settings for different products.
#[serde(default, skip_serializing_if = "RetentionsConfig::is_empty")]
pub retentions: RetentionsConfig,
#[serde(default, skip_serializing_if = "ItemConfigs::is_empty")]
pub item_configs: ItemConfigs,
/// Usage quotas for this project.
#[serde(skip_serializing_if = "Vec::is_empty")]
pub quotas: Vec<Quota>,
Expand Down Expand Up @@ -159,6 +161,7 @@ impl Default for ProjectConfig {
event_retention: None,
downsampled_event_retention: None,
retentions: Default::default(),
item_configs: Default::default(),
quotas: Vec::new(),
sampling: None,
measurements: None,
Expand Down Expand Up @@ -277,6 +280,44 @@ impl RetentionsConfig {
}
}

#[derive(Debug, Copy, Clone, Serialize, Deserialize, Default)]
pub struct ItemConfig {
pub retention: Option<RetentionConfig>,
}

impl ItemConfig {
fn is_empty(&self) -> bool {
let Self { retention } = self;
retention.is_none()
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct ItemConfigs {
#[serde(default, skip_serializing_if = "ItemConfig::is_empty")]
pub log: ItemConfig,
#[serde(default, skip_serializing_if = "ItemConfig::is_empty")]
pub span: ItemConfig,
#[serde(default, skip_serializing_if = "ItemConfig::is_empty")]
pub trace_metric: ItemConfig,
#[serde(default, skip_serializing_if = "ItemConfig::is_empty")]
pub trace_attachment: ItemConfig,
}

impl ItemConfigs {
fn is_empty(&self) -> bool {
let Self {
log,
span,
trace_metric,
trace_attachment,
} = self;

log.is_empty() && span.is_empty() && trace_metric.is_empty() && trace_attachment.is_empty()
}
}

fn is_false(value: &bool) -> bool {
!*value
}
Expand Down
30 changes: 17 additions & 13 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use relay_config::Config;
use relay_dynamic_config::GlobalConfig;
#[cfg(feature = "processing")]
use relay_dynamic_config::{RetentionConfig, RetentionsConfig};
use relay_dynamic_config::RetentionConfig;
#[cfg(feature = "processing")]
use relay_system::{Addr, FromMessage};

Expand Down Expand Up @@ -82,20 +82,24 @@ pub struct ForwardContext<'a> {
pub project_info: &'a ProjectInfo,
}

#[cfg(feature = "processing")]
impl ForwardContext<'_> {
/// Returns the [`Retention`] for a specific type/product.
pub fn retention<F>(&self, f: F) -> Retention
where
F: FnOnce(&RetentionsConfig) -> Option<&RetentionConfig>,
{
if let Some(retention) = f(&self.project_info.config.retentions) {
return Retention::from(*retention);
/// Extracts a [`Retention`] for an `item` type from a [`ForwardContext`].
///
/// This will first try the `item_configs` in the contained project config,
/// then the `retentions`, then [`ForwardContext::event_retention()`].
macro_rules! retention {
($ctx:expr, $item:ident) => {
if let Some(retention) = $ctx.project_info.config.item_configs.$item.retention {
crate::processing::forward::Retention::from(retention)
} else if let Some(retention) = $ctx.project_info.config.retentions.$item {
crate::processing::forward::Retention::from(retention)
} else {
$ctx.event_retention()
}
};
}

self.event_retention()
}

#[cfg(feature = "processing")]
impl ForwardContext<'_> {
/// Returns the event [`Retention`].
///
/// This retention is also often used for older products and can be considered a default
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl Forward for LogOutput {
let ctx = store::Context {
scoping: logs.scoping(),
received_at: logs.received_at(),
retention: ctx.retention(|r| r.log.as_ref()),
retention: retention!(ctx, log),
};

for log in logs.split(|logs| logs.logs) {
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::metrics_extraction::transactions::ExtractedMetrics;
use crate::services::projects::project::ProjectInfo;

mod common;
#[macro_use]
mod forward;
mod limits;

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl Forward for SpanOutput {

let ctx = store::Context {
server_sample_rate: spans.server_sample_rate,
retention: ctx.retention(|r| r.span.as_ref()),
retention: retention!(ctx, span),
};

let spans_and_attachments = spans.split(|spans| spans.into_parts());
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/trace_attachments/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Forward for Managed<ExpandedAttachments> {
s: StoreHandle<'_>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let retention = ctx.retention(|r| r.trace_attachment.as_ref());
let retention = retention!(ctx, trace_attachment);
let server_sample_rate = self.server_sample_rate;
for attachment in self.split(|work| work.attachments) {
if let Ok(message) = store::convert(attachment, retention, server_sample_rate) {
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/trace_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Forward for TraceMetricOutput {
let ctx = store::Context {
scoping: metrics.scoping(),
received_at: metrics.received_at(),
retention: ctx.retention(|r| r.trace_metric.as_ref()),
retention: retention!(ctx, trace_metric),
};

for metric in metrics.split(|metrics| metrics.metrics) {
Expand Down
52 changes: 44 additions & 8 deletions tests/integration/test_otlp_logs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

from datetime import datetime, timezone, timedelta
from unittest import mock

Expand All @@ -13,8 +15,14 @@
}


@pytest.mark.parametrize("retention_config_field", ("retentions", "item_configs"))
def test_otlp_logs_conversion(
mini_sentry, relay, relay_with_processing, outcomes_consumer, items_consumer
mini_sentry,
relay,
relay_with_processing,
outcomes_consumer,
items_consumer,
retention_config_field,
):
"""Test OTLP logs conversion including basic and complex attributes."""
items_consumer = items_consumer()
Expand All @@ -25,9 +33,20 @@ def test_otlp_logs_conversion(
"organizations:ourlogs-ingestion",
"organizations:relay-otel-logs-endpoint",
]
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
}

if retention_config_field == "retentions":
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
}
elif retention_config_field == "item_configs":
project_config["config"]["itemConfigs"] = {
"log": {"retention": {"standard": 30, "downsampled": 13 * 30}}
}
# Put a bogus value in `"retentions"`. The one in `"item_configs"` should
# take precedence.
project_config["config"]["retentions"] = {
"log": {"standard": 1, "downsampled": 1},
}

relay = relay(relay_with_processing(options=TEST_CONFIG), options=TEST_CONFIG)

Expand Down Expand Up @@ -182,8 +201,14 @@ def test_otlp_logs_conversion(
]


@pytest.mark.parametrize("retention_config_field", ("retentions", "item_configs"))
def test_otlp_logs_multiple_records(
mini_sentry, relay, relay_with_processing, outcomes_consumer, items_consumer
mini_sentry,
relay,
relay_with_processing,
outcomes_consumer,
items_consumer,
retention_config_field,
):
"""Test multiple log records in a single payload."""
items_consumer = items_consumer()
Expand All @@ -194,9 +219,20 @@ def test_otlp_logs_multiple_records(
"organizations:ourlogs-ingestion",
"organizations:relay-otel-logs-endpoint",
]
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
}

if retention_config_field == "retentions":
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
}
elif retention_config_field == "item_configs":
project_config["config"]["itemConfigs"] = {
"log": {"retention": {"standard": 30, "downsampled": 13 * 30}}
}
# Put a bogus value in `"retentions"`. The one in `"item_configs"` should
# take precedence.
project_config["config"]["retentions"] = {
"log": {"standard": 1, "downsampled": 1},
}

relay = relay(relay_with_processing(options=TEST_CONFIG), options=TEST_CONFIG)

Expand Down
38 changes: 32 additions & 6 deletions tests/integration/test_ourlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ def timestamps(ts: datetime):
}


@pytest.mark.parametrize("retention_config_field", ("retentions", "item_configs"))
def test_ourlog_multiple_containers_not_allowed(
mini_sentry,
relay,
relay_with_processing,
items_consumer,
outcomes_consumer,
retention_config_field,
):
items_consumer = items_consumer()
outcomes_consumer = outcomes_consumer()
Expand All @@ -68,9 +70,20 @@ def test_ourlog_multiple_containers_not_allowed(
project_config["config"]["features"] = [
"organizations:ourlogs-ingestion",
]
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
}

if retention_config_field == "retentions":
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
}
elif retention_config_field == "item_configs":
project_config["config"]["itemConfigs"] = {
"log": {"retention": {"standard": 30, "downsampled": 13 * 30}}
}
# Put a bogus value in `"retentions"`. The one in `"item_configs"` should
# take precedence.
project_config["config"]["retentions"] = {
"log": {"standard": 1, "downsampled": 1},
}

relay = relay(relay_with_processing(options=TEST_CONFIG), options=TEST_CONFIG)
start = datetime.now(timezone.utc)
Expand Down Expand Up @@ -639,18 +652,31 @@ def test_ourlog_extraction_with_sentry_logs_with_missing_fields(
}


@pytest.mark.parametrize("retention_config_field", ("retentions", "item_configs"))
def test_ourlog_extraction_is_disabled_without_feature(
mini_sentry,
relay_with_processing,
items_consumer,
retention_config_field,
):
items_consumer = items_consumer()
relay = relay_with_processing(options=TEST_CONFIG)
project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
}

if retention_config_field == "retentions":
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
}
elif retention_config_field == "item_configs":
project_config["config"]["itemConfigs"] = {
"log": {"retention": {"standard": 30, "downsampled": 13 * 30}}
}
# Put a bogus value in `"retentions"`. The one in `"item_configs"` should
# take precedence.
project_config["config"]["retentions"] = {
"log": {"standard": 1, "downsampled": 1},
}

project_config["config"]["features"] = []

Expand Down
46 changes: 40 additions & 6 deletions tests/integration/test_spansv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ def envelope_with_spans(*payloads: dict, trace_info=None) -> Envelope:
return envelope


@pytest.mark.parametrize("retention_config_field", ("retentions", "item_configs"))
def test_spansv2_basic(
mini_sentry,
relay,
relay_with_processing,
spans_consumer,
metrics_consumer,
retention_config_field,
):
"""
A basic test making sure spans can be ingested and have basic normalizations applied.
Expand All @@ -54,10 +56,27 @@ def test_spansv2_basic(
"organizations:standalone-span-ingestion",
"projects:span-v2-experimental-processing",
],
"retentions": {"span": {"standard": 42, "downsampled": 1337}},
}
)

if retention_config_field == "retentions":
project_config["config"].update(
{
"retentions": {"span": {"standard": 42, "downsampled": 1337}},
}
)
elif retention_config_field == "item_configs":
project_config["config"].update(
{
"itemConfigs": {
"span": {"retention": {"standard": 42, "downsampled": 1337}}
},
# Put a bogus value in `"retentions"`. The one in `"item_configs"` should
# take precedence.
"retentions": {"span": {"standard": 1, "downsampled": 1}},
}
)

relay = relay(relay_with_processing(options=TEST_CONFIG), options=TEST_CONFIG)

ts = datetime.now(timezone.utc)
Expand Down Expand Up @@ -1126,11 +1145,9 @@ def test_spanv2_meta_pii_scrubbing_complex_attribute(mini_sentry, relay):
}


@pytest.mark.parametrize("retention_config_field", ("retentions", "item_configs"))
def test_spansv2_attribute_normalization(
mini_sentry,
relay,
relay_with_processing,
spans_consumer,
mini_sentry, relay, relay_with_processing, spans_consumer, retention_config_field
):
"""
A test making sure spans undergo attribute normalization after ingestion.
Expand All @@ -1145,10 +1162,27 @@ def test_spansv2_attribute_normalization(
"organizations:standalone-span-ingestion",
"projects:span-v2-experimental-processing",
],
"retentions": {"span": {"standard": 42, "downsampled": 1337}},
}
)

if retention_config_field == "retentions":
project_config["config"].update(
{
"retentions": {"span": {"standard": 42, "downsampled": 1337}},
}
)
elif retention_config_field == "item_configs":
project_config["config"].update(
{
"itemConfigs": {
"span": {"retention": {"standard": 42, "downsampled": 1337}}
},
# Put a bogus value in `"retentions"`. The one in `"item_configs"` should
# take precedence.
"retentions": {"span": {"standard": 1, "downsampled": 1}},
}
)

relay = relay(relay_with_processing(options=TEST_CONFIG), options=TEST_CONFIG)

ts = datetime.now(timezone.utc)
Expand Down
Loading
Loading