diff --git a/Dockerfile b/Dockerfile index a6dc631cd4..21e49d39c5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -88,6 +88,7 @@ RUN set -ex; \ FROM build_rust_snuba_deps AS build_rust_snuba COPY ./rust_snuba/ ./rust_snuba/ +COPY ./sentry-options/schemas/ ./sentry-options/schemas/ COPY --from=build_rust_snuba_deps /usr/src/snuba/rust_snuba/target/ ./rust_snuba/target/ COPY --from=build_rust_snuba_deps /root/.cargo/ /root/.cargo/ RUN set -ex; \ @@ -134,8 +135,7 @@ ENV LD_PRELOAD=/usr/src/snuba/libjemalloc.so.2 \ PYTHONUNBUFFERED=1 \ PYTHONDONTWRITEBYTECODE=1 -# set up sentry options schemas and default path -COPY sentry-options/schemas /etc/sentry-options/schemas +# set default path for sentry options values ENV SENTRY_OPTIONS_DIR=/etc/sentry-options USER snuba @@ -167,7 +167,6 @@ FROM ghcr.io/getsentry/dhi/python:3.13-debian13 AS application-distroless COPY --from=distroless_prep /.venv /.venv COPY --from=distroless_prep /usr/src/snuba /usr/src/snuba -COPY --from=distroless_prep /etc/sentry-options /etc/sentry-options COPY --from=distroless_prep /usr/lib/*/libjemalloc.so.2 /usr/lib/libjemalloc.so.2 COPY --from=distroless_prep /etc/passwd /etc/passwd COPY --from=distroless_prep /etc/group /etc/group @@ -192,7 +191,6 @@ FROM ghcr.io/getsentry/dhi/python:3.13-debian13-dev AS application-distroless-de COPY --from=distroless_prep /.venv /.venv COPY --from=distroless_prep /usr/src/snuba /usr/src/snuba -COPY --from=distroless_prep /etc/sentry-options /etc/sentry-options COPY --from=distroless_prep /usr/lib/*/libjemalloc.so.2 /usr/lib/libjemalloc.so.2 COPY --from=distroless_prep /etc/passwd /etc/passwd COPY --from=distroless_prep /etc/group /etc/group diff --git a/rust_snuba/.cargo/config.toml b/rust_snuba/.cargo/config.toml new file mode 100644 index 0000000000..5eec8a12c5 --- /dev/null +++ b/rust_snuba/.cargo/config.toml @@ -0,0 +1,2 @@ +[env] +SENTRY_OPTIONS_DIR = { value = "../sentry-options", relative = true } diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 9b0af87497..3a59b394a1 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -1961,7 +1961,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -4249,9 +4249,9 @@ dependencies = [ [[package]] name = "sentry-options" -version = "1.0.3" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9599ed03f9ca504ee87808e955e05bf42901bb2e1242476c6c5d6dd20a2134b" +checksum = "a8cc0c862d4de3c89fe7c3f59077bed42325a6e0538798f34670c01a5e9d6449" dependencies = [ "num", "sentry-options-validation", @@ -4262,9 +4262,9 @@ dependencies = [ [[package]] name = "sentry-options-validation" -version = "1.0.3" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "825f906edf04e9dee86e14bd5192d8b53faf8cd49fcef1e804c082833a16ebca" +checksum = "29c18b044fe4ca547a943dea746b8f07fd58b1042a3145b7e62fb94d1ab2acb6" dependencies = [ "anyhow", "chrono", diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index e70153fb01..fb945ccbf9 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -54,7 +54,7 @@ sentry = { version = "0.41.0", default-features = false, features = [ "logs" ] } sentry-kafka-schemas = "2.1.24" -sentry-options = "1.0.3" +sentry-options = "1.0.5" sentry_protos = "0.7.0" sentry_arroyo = { version = "2.37.0", features = ["ssl"] } sentry_usage_accountant = { version = "0.1.2", features = ["kafka"] } diff --git a/rust_snuba/src/accepted_outcomes_consumer.rs b/rust_snuba/src/accepted_outcomes_consumer.rs index b0dae3b800..6281598955 100644 --- a/rust_snuba/src/accepted_outcomes_consumer.rs +++ b/rust_snuba/src/accepted_outcomes_consumer.rs @@ -12,6 +12,8 @@ use sentry_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrate use sentry_arroyo::processing::StreamProcessor; use sentry_arroyo::types::{Partition, Topic}; +use sentry_options::init_with_schemas; + use pyo3::prelude::*; use crate::config; @@ -33,7 +35,6 @@ pub struct AcceptedOutcomesStrategyFactory { producer: Arc, concurrency: ConcurrencyConfig, skip_produce: bool, - use_item_timestamp: bool, } impl ProcessingStrategyFactory for AcceptedOutcomesStrategyFactory { @@ -55,7 +56,6 @@ impl ProcessingStrategyFactory for AcceptedOutcomesStrategyFactory self.max_batch_size, self.max_batch_time_ms, self.bucket_interval, - self.use_item_timestamp, )) } } @@ -120,6 +120,8 @@ pub fn accepted_outcomes_consumer_impl( commit_frequency_sec: u64, ) -> usize { setup_logging(); + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]) + .expect("failed to initialize sentry-options"); let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); @@ -204,11 +206,6 @@ pub fn accepted_outcomes_consumer_impl( .flatten() .unwrap_or("1".to_string()) == "1"; - let use_item_timestamp = get_str_config("accepted_outcomes_use_item_timestamp") - .ok() - .flatten() - .unwrap_or("0".to_string()) - == "1"; let factory = AcceptedOutcomesStrategyFactory { bucket_interval, @@ -219,7 +216,6 @@ pub fn accepted_outcomes_consumer_impl( producer, concurrency: ConcurrencyConfig::new(concurrency), skip_produce, - use_item_timestamp, }; let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 36bb858ffb..2bda56f926 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -16,6 +16,8 @@ use sentry_arroyo::types::Topic; use pyo3::prelude::*; use pyo3::types::PyBytes; +use sentry_options::init_with_schemas; + use crate::config; use crate::factory_v2::ConsumerStrategyFactoryV2; use crate::logging::{setup_logging, setup_sentry}; @@ -94,6 +96,8 @@ pub fn consumer_impl( use_row_binary: bool, ) -> usize { setup_logging(); + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]) + .expect("failed to initialize sentry-options"); let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); let max_batch_size = consumer_config.max_batch_size; diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index a89553a7fb..31b5c1264e 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -1,3 +1,6 @@ +pub(crate) const SNUBA_SCHEMA: &str = + include_str!("../../sentry-options/schemas/snuba/schema.json"); + mod accepted_outcomes_consumer; mod config; mod consumer; diff --git a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs index eb21684fd7..8d1661b7c3 100644 --- a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs +++ b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs @@ -11,6 +11,8 @@ use sentry_arroyo::types::{InnerMessage, Message, Partition}; use sentry_arroyo::utils::timing::Deadline; use sentry_protos::snuba::v1::TraceItem; +use sentry_options::options; + use crate::types::{AggregatedOutcomesBatch, BucketKey}; #[derive(Debug, Default)] @@ -57,8 +59,7 @@ pub struct OutcomesAggregator { message_carried_over: Option>, /// Commit request carried over from a poll where we had a message to retry. commit_request_carried_over: Option, - /// Temporary option to change the timestamp source from - /// `received` to `timestamp` on the item event. + /// Cached value of the `consumer.use_item_timestamp` option, refreshed on each poll. use_item_timestamp: bool, } @@ -68,7 +69,6 @@ impl OutcomesAggregator { max_batch_size: usize, max_batch_time_ms: Duration, bucket_interval: u64, - use_item_timestamp: bool, ) -> Self { Self { next_step, @@ -80,7 +80,7 @@ impl OutcomesAggregator { latest_offsets: HashMap::new(), message_carried_over: None, commit_request_carried_over: None, - use_item_timestamp, + use_item_timestamp: false, } } @@ -129,6 +129,12 @@ impl> ProcessingStrategy { fn poll(&mut self) -> Result, StrategyError> { + self.use_item_timestamp = options("snuba") + .ok() + .and_then(|o| o.get("consumer.use_item_timestamp").ok()) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let commit_request = self.next_step.poll()?; self.commit_request_carried_over = merge_commit_request(self.commit_request_carried_over.take(), commit_request); @@ -208,6 +214,7 @@ impl> ProcessingStrategy>, @@ -323,7 +333,6 @@ mod tests { 500, Duration::from_millis(5_000), 60, - false, ); let topic = Topic::new("accepted-outcomes"); @@ -371,7 +380,6 @@ mod tests { 500, Duration::from_millis(2_000), 60, - false, ); let topic = Topic::new("snuba-items"); @@ -455,12 +463,12 @@ mod tests { #[test] fn poll_flushes_when_max_batch_size_reached() { + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap(); let mut aggregator = OutcomesAggregator::new( Noop { last_message: None }, 1, Duration::from_millis(30_000), 60, - false, ); let partition = Partition::new(Topic::new("accepted-outcomes"), 0); @@ -481,6 +489,7 @@ mod tests { #[test] fn submit_returns_backpressure_when_message_carried_over() { + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap(); struct RejectOnce { rejected: bool, } @@ -513,7 +522,6 @@ mod tests { 1, // flush after 1 bucket Duration::from_millis(30_000), 60, - false, ); let partition = Partition::new(Topic::new("test"), 0); @@ -549,6 +557,7 @@ mod tests { #[test] fn join_honors_timeout_when_message_stays_carried_over() { + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap(); struct AlwaysReject; impl ProcessingStrategy for AlwaysReject { fn poll(&mut self) -> Result, StrategyError> { @@ -570,7 +579,7 @@ mod tests { } let mut aggregator = - OutcomesAggregator::new(AlwaysReject, 1, Duration::from_millis(30_000), 60, false); + OutcomesAggregator::new(AlwaysReject, 1, Duration::from_millis(30_000), 60); let partition = Partition::new(Topic::new("test"), 0); let payload = make_payload(6_000, 1, 2, 3, &[(4, 1)]); @@ -590,12 +599,14 @@ mod tests { #[test] fn submit_uses_item_timestamp_when_enabled() { + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap(); + let _guard = + override_options(&[("snuba", "consumer.use_item_timestamp", json!(true))]).unwrap(); let mut aggregator = OutcomesAggregator::new( Noop { last_message: None }, 500, Duration::from_millis(2_000), 60, - true, ); let topic = Topic::new("snuba-items"); @@ -625,6 +636,9 @@ mod tests { trace_item.encode(&mut buf).unwrap(); let payload = KafkaPayload::new(None, None, Some(buf)); + // we need to poll first in order to get the new value (true) + aggregator.poll().unwrap(); + aggregator .submit(Message::new_broker_message( payload, @@ -646,4 +660,82 @@ mod tests { Some(1) ); } + + #[test] + fn poll_updates_use_item_timestamp_dynamically() { + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap(); + let mut aggregator = OutcomesAggregator::new( + Noop { last_message: None }, + 500, + Duration::from_millis(30_000), + 60, + ); + + let partition = Partition::new(Topic::new("snuba-items"), 0); + + let mut buf = Vec::new(); + TraceItem { + organization_id: 1, + project_id: 2, + received: Some(Timestamp { + seconds: 1_700_000_000, + nanos: 0, + }), + timestamp: Some(Timestamp { + seconds: 1_700_000_060, + nanos: 0, + }), + outcomes: Some(Outcomes { + key_id: 3, + category_count: vec![CategoryCount { + data_category: 4, + quantity: 1, + }], + }), + ..Default::default() + } + .encode(&mut buf) + .unwrap(); + let payload = KafkaPayload::new(None, None, Some(buf)); + + let bucket_quantity = |aggregator: &OutcomesAggregator, offset: u64| { + let key = BucketKey { + time_offset: offset, + org_id: 1, + project_id: 2, + key_id: 3, + category: 4, + }; + aggregator.batch.buckets.get(&key).map(|s| s.quantity) + }; + + let mut offset = 0; + let mut do_submit = |aggregator: &mut OutcomesAggregator| { + aggregator.poll().unwrap(); + aggregator + .submit(Message::new_broker_message( + payload.clone(), + partition, + offset, + Utc::now(), + )) + .unwrap(); + offset += 1; + }; + + // Enable item timestamp + let guard = + override_options(&[("snuba", "consumer.use_item_timestamp", json!(true))]).unwrap(); + do_submit(&mut aggregator); + assert_eq!(bucket_quantity(&aggregator, 28_333_334), Some(1)); + assert_eq!(bucket_quantity(&aggregator, 28_333_333), None); + + // Disable item timestamp + drop(guard); + let _guard = + override_options(&[("snuba", "consumer.use_item_timestamp", json!(false))]).unwrap(); + do_submit(&mut aggregator); + assert_eq!(bucket_quantity(&aggregator, 28_333_333), Some(1)); + assert_eq!(bucket_quantity(&aggregator, 28_333_334), Some(1)); // still present from first submit + } }