Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ RUN set -ex; \

FROM build_rust_snuba_deps AS build_rust_snuba
COPY ./rust_snuba/ ./rust_snuba/
COPY ./sentry-options/schemas/ ./sentry-options/schemas/
COPY --from=build_rust_snuba_deps /usr/src/snuba/rust_snuba/target/ ./rust_snuba/target/
COPY --from=build_rust_snuba_deps /root/.cargo/ /root/.cargo/
RUN set -ex; \
Expand Down Expand Up @@ -134,8 +135,7 @@ ENV LD_PRELOAD=/usr/src/snuba/libjemalloc.so.2 \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1

# set up sentry options schemas and default path
COPY sentry-options/schemas /etc/sentry-options/schemas
# set default path for sentry options values
ENV SENTRY_OPTIONS_DIR=/etc/sentry-options

USER snuba
Expand Down Expand Up @@ -167,7 +167,6 @@ FROM ghcr.io/getsentry/dhi/python:3.13-debian13 AS application-distroless

COPY --from=distroless_prep /.venv /.venv
COPY --from=distroless_prep /usr/src/snuba /usr/src/snuba
COPY --from=distroless_prep /etc/sentry-options /etc/sentry-options
COPY --from=distroless_prep /usr/lib/*/libjemalloc.so.2 /usr/lib/libjemalloc.so.2
COPY --from=distroless_prep /etc/passwd /etc/passwd
COPY --from=distroless_prep /etc/group /etc/group
Expand All @@ -192,7 +191,6 @@ FROM ghcr.io/getsentry/dhi/python:3.13-debian13-dev AS application-distroless-de

COPY --from=distroless_prep /.venv /.venv
COPY --from=distroless_prep /usr/src/snuba /usr/src/snuba
COPY --from=distroless_prep /etc/sentry-options /etc/sentry-options
COPY --from=distroless_prep /usr/lib/*/libjemalloc.so.2 /usr/lib/libjemalloc.so.2
COPY --from=distroless_prep /etc/passwd /etc/passwd
COPY --from=distroless_prep /etc/group /etc/group
Expand Down
2 changes: 2 additions & 0 deletions rust_snuba/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[env]
SENTRY_OPTIONS_DIR = { value = "../sentry-options", relative = true }
10 changes: 5 additions & 5 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ sentry = { version = "0.41.0", default-features = false, features = [
"logs"
] }
sentry-kafka-schemas = "2.1.24"
sentry-options = "1.0.3"
sentry-options = "1.0.5"
sentry_protos = "0.7.0"
sentry_arroyo = { version = "2.37.0", features = ["ssl"] }
sentry_usage_accountant = { version = "0.1.2", features = ["kafka"] }
Expand Down
12 changes: 4 additions & 8 deletions rust_snuba/src/accepted_outcomes_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use sentry_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrate
use sentry_arroyo::processing::StreamProcessor;
use sentry_arroyo::types::{Partition, Topic};

use sentry_options::init_with_schemas;

use pyo3::prelude::*;

use crate::config;
Expand All @@ -33,7 +35,6 @@ pub struct AcceptedOutcomesStrategyFactory {
producer: Arc<KafkaProducer>,
concurrency: ConcurrencyConfig,
skip_produce: bool,
use_item_timestamp: bool,
}

impl ProcessingStrategyFactory<KafkaPayload> for AcceptedOutcomesStrategyFactory {
Expand All @@ -55,7 +56,6 @@ impl ProcessingStrategyFactory<KafkaPayload> for AcceptedOutcomesStrategyFactory
self.max_batch_size,
self.max_batch_time_ms,
self.bucket_interval,
self.use_item_timestamp,
))
}
}
Expand Down Expand Up @@ -120,6 +120,8 @@ pub fn accepted_outcomes_consumer_impl(
commit_frequency_sec: u64,
) -> usize {
setup_logging();
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)])
.expect("failed to initialize sentry-options");

let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap();

Expand Down Expand Up @@ -204,11 +206,6 @@ pub fn accepted_outcomes_consumer_impl(
.flatten()
.unwrap_or("1".to_string())
== "1";
let use_item_timestamp = get_str_config("accepted_outcomes_use_item_timestamp")
.ok()
.flatten()
.unwrap_or("0".to_string())
== "1";

let factory = AcceptedOutcomesStrategyFactory {
bucket_interval,
Expand All @@ -219,7 +216,6 @@ pub fn accepted_outcomes_consumer_impl(
producer,
concurrency: ConcurrencyConfig::new(concurrency),
skip_produce,
use_item_timestamp,
};
let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy);

Expand Down
4 changes: 4 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use sentry_arroyo::types::Topic;
use pyo3::prelude::*;
use pyo3::types::PyBytes;

use sentry_options::init_with_schemas;

use crate::config;
use crate::factory_v2::ConsumerStrategyFactoryV2;
use crate::logging::{setup_logging, setup_sentry};
Expand Down Expand Up @@ -94,6 +96,8 @@ pub fn consumer_impl(
use_row_binary: bool,
) -> usize {
setup_logging();
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)])
.expect("failed to initialize sentry-options");

let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap();
let max_batch_size = consumer_config.max_batch_size;
Expand Down
3 changes: 3 additions & 0 deletions rust_snuba/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub(crate) const SNUBA_SCHEMA: &str =
include_str!("../../sentry-options/schemas/snuba/schema.json");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schemas are now baked in during compile time.

this means during runtime, init_with_schemas should never fail when crate::SNUBA_SCHEMA is passed in (assuming the schema is not malformed, but this is caught in tests and our CI validation)

our only vulnerabilities now are options() and get()


mod accepted_outcomes_consumer;
mod config;
mod consumer;
Expand Down
112 changes: 102 additions & 10 deletions rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use sentry_arroyo::types::{InnerMessage, Message, Partition};
use sentry_arroyo::utils::timing::Deadline;
use sentry_protos::snuba::v1::TraceItem;

use sentry_options::options;

use crate::types::{AggregatedOutcomesBatch, BucketKey};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -57,8 +59,7 @@ pub struct OutcomesAggregator<TNext> {
message_carried_over: Option<Message<AggregatedOutcomesBatch>>,
/// Commit request carried over from a poll where we had a message to retry.
commit_request_carried_over: Option<CommitRequest>,
/// Temporary option to change the timestamp source from
/// `received` to `timestamp` on the item event.
/// Cached value of the `consumer.use_item_timestamp` option, refreshed on each poll.
use_item_timestamp: bool,
}

Expand All @@ -68,7 +69,6 @@ impl<TNext> OutcomesAggregator<TNext> {
max_batch_size: usize,
max_batch_time_ms: Duration,
bucket_interval: u64,
use_item_timestamp: bool,
) -> Self {
Self {
next_step,
Expand All @@ -80,7 +80,7 @@ impl<TNext> OutcomesAggregator<TNext> {
latest_offsets: HashMap::new(),
message_carried_over: None,
commit_request_carried_over: None,
use_item_timestamp,
use_item_timestamp: false,
}
}

Expand Down Expand Up @@ -129,6 +129,12 @@ impl<TNext: ProcessingStrategy<AggregatedOutcomesBatch>> ProcessingStrategy<Kafk
for OutcomesAggregator<TNext>
{
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kenzoengineer so poll can be called like thousands of time a second i don't think we need to be checked every poll, I think we could add like a last_options_check time or something and check and refresh the value like every 5 seconds or 10 seconds?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for posterity: this has been benchmarked and the normal options().get() call takes ~150ns, while a cached call with a 5 second TTL takes ~40ns

while theres a dramatic decrease, the numbers are still in the nanoseconds. We can keep it the naive way for now and in a future PR we can make this optimization if necessary

self.use_item_timestamp = options("snuba")
.ok()
.and_then(|o| o.get("consumer.use_item_timestamp").ok())
.and_then(|v| v.as_bool())
.unwrap_or(false);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option read on every poll causes unnecessary I/O

Medium Severity

options("snuba").get("consumer.use_item_timestamp") is called on every poll() invocation. Since poll() can be called thousands of times per second, this causes excessive reads from the sentry-options backend (filesystem/ConfigMap). The sentry-options library's own quickstart example demonstrates reading values in a loop with a 3-second sleep, suggesting per-call reads are not meant for hot paths. A time-based cache (e.g., refreshing every 5–10 seconds) would maintain dynamic reloadability while avoiding the overhead, as the PR reviewer also suggested.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is incorrect. we only check memory.


let commit_request = self.next_step.poll()?;
self.commit_request_carried_over =
merge_commit_request(self.commit_request_carried_over.take(), commit_request);
Expand Down Expand Up @@ -208,6 +214,7 @@ impl<TNext: ProcessingStrategy<AggregatedOutcomesBatch>> ProcessingStrategy<Kafk
.map(|t| t.seconds as u64)
.unwrap_or(0)
};

let org_id = trace_item.organization_id;
let project_id = trace_item.project_id;

Expand Down Expand Up @@ -263,7 +270,10 @@ mod tests {
use prost::Message as ProstMessage;
use prost_types::Timestamp;
use sentry_arroyo::types::{Partition, Topic};
use sentry_options::init_with_schemas;
use sentry_options::testing::override_options;
use sentry_protos::snuba::v1::{CategoryCount, Outcomes};
use serde_json::json;

struct Noop {
last_message: Option<Message<AggregatedOutcomesBatch>>,
Expand Down Expand Up @@ -323,7 +333,6 @@ mod tests {
500,
Duration::from_millis(5_000),
60,
false,
);

let topic = Topic::new("accepted-outcomes");
Expand Down Expand Up @@ -371,7 +380,6 @@ mod tests {
500,
Duration::from_millis(2_000),
60,
false,
);

let topic = Topic::new("snuba-items");
Expand Down Expand Up @@ -455,12 +463,12 @@ mod tests {

#[test]
fn poll_flushes_when_max_batch_size_reached() {
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap();
let mut aggregator = OutcomesAggregator::new(
Noop { last_message: None },
1,
Duration::from_millis(30_000),
60,
false,
);

let partition = Partition::new(Topic::new("accepted-outcomes"), 0);
Expand All @@ -481,6 +489,7 @@ mod tests {

#[test]
fn submit_returns_backpressure_when_message_carried_over() {
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap();
struct RejectOnce {
rejected: bool,
}
Expand Down Expand Up @@ -513,7 +522,6 @@ mod tests {
1, // flush after 1 bucket
Duration::from_millis(30_000),
60,
false,
);

let partition = Partition::new(Topic::new("test"), 0);
Expand Down Expand Up @@ -549,6 +557,7 @@ mod tests {

#[test]
fn join_honors_timeout_when_message_stays_carried_over() {
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap();
struct AlwaysReject;
impl ProcessingStrategy<AggregatedOutcomesBatch> for AlwaysReject {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Expand All @@ -570,7 +579,7 @@ mod tests {
}

let mut aggregator =
OutcomesAggregator::new(AlwaysReject, 1, Duration::from_millis(30_000), 60, false);
OutcomesAggregator::new(AlwaysReject, 1, Duration::from_millis(30_000), 60);
let partition = Partition::new(Topic::new("test"), 0);
let payload = make_payload(6_000, 1, 2, 3, &[(4, 1)]);

Expand All @@ -590,12 +599,14 @@ mod tests {

#[test]
fn submit_uses_item_timestamp_when_enabled() {
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap();
let _guard =
override_options(&[("snuba", "consumer.use_item_timestamp", json!(true))]).unwrap();
let mut aggregator = OutcomesAggregator::new(
Noop { last_message: None },
500,
Duration::from_millis(2_000),
60,
true,
);

let topic = Topic::new("snuba-items");
Expand Down Expand Up @@ -625,6 +636,9 @@ mod tests {
trace_item.encode(&mut buf).unwrap();
let payload = KafkaPayload::new(None, None, Some(buf));

// we need to poll first in order to get the new value (true)
aggregator.poll().unwrap();

aggregator
.submit(Message::new_broker_message(
payload,
Expand All @@ -646,4 +660,82 @@ mod tests {
Some(1)
);
}

#[test]
fn poll_updates_use_item_timestamp_dynamically() {
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap();
let mut aggregator = OutcomesAggregator::new(
Noop { last_message: None },
500,
Duration::from_millis(30_000),
60,
);

let partition = Partition::new(Topic::new("snuba-items"), 0);

let mut buf = Vec::new();
TraceItem {
organization_id: 1,
project_id: 2,
received: Some(Timestamp {
seconds: 1_700_000_000,
nanos: 0,
}),
timestamp: Some(Timestamp {
seconds: 1_700_000_060,
nanos: 0,
}),
outcomes: Some(Outcomes {
key_id: 3,
category_count: vec![CategoryCount {
data_category: 4,
quantity: 1,
}],
}),
..Default::default()
}
.encode(&mut buf)
.unwrap();
let payload = KafkaPayload::new(None, None, Some(buf));

let bucket_quantity = |aggregator: &OutcomesAggregator<Noop>, offset: u64| {
let key = BucketKey {
time_offset: offset,
org_id: 1,
project_id: 2,
key_id: 3,
category: 4,
};
aggregator.batch.buckets.get(&key).map(|s| s.quantity)
};

let mut offset = 0;
let mut do_submit = |aggregator: &mut OutcomesAggregator<Noop>| {
aggregator.poll().unwrap();
aggregator
.submit(Message::new_broker_message(
payload.clone(),
partition,
offset,
Utc::now(),
))
.unwrap();
offset += 1;
};

// Enable item timestamp
let guard =
override_options(&[("snuba", "consumer.use_item_timestamp", json!(true))]).unwrap();
do_submit(&mut aggregator);
assert_eq!(bucket_quantity(&aggregator, 28_333_334), Some(1));
assert_eq!(bucket_quantity(&aggregator, 28_333_333), None);

// Disable item timestamp
drop(guard);
let _guard =
override_options(&[("snuba", "consumer.use_item_timestamp", json!(false))]).unwrap();
do_submit(&mut aggregator);
assert_eq!(bucket_quantity(&aggregator, 28_333_333), Some(1));
assert_eq!(bucket_quantity(&aggregator, 28_333_334), Some(1)); // still present from first submit
}
}
Loading