Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
667 changes: 538 additions & 129 deletions rust_snuba/Cargo.lock

Large diffs are not rendered by default.

17 changes: 14 additions & 3 deletions rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ crate-type = ["cdylib", "rlib"]
[dependencies]
adler = "1.0.2"
anyhow = { version = "1.0.69", features = ["backtrace"] }
cadence = "0.29.1"
cadence = "1.0.0"
chrono = { version = "0.4.26", features = ["serde"] }
ctrlc = { version = "3.2.5", features = ["termination"] }
data-encoding = "2.5.0"
Expand All @@ -39,7 +39,18 @@ pyo3 = { version = "0.18.1", features = ["chrono"] }
regex = "1.11.1"
reqwest = { version = "0.11.11", features = ["stream"] }
schemars = { version = "0.8.16", features = ["uuid1"] }
sentry = { version = "0.32.0", features = ["anyhow", "tracing"] }
sentry = { version = "0.41.0", default-features = false, features = [
# default features, except `release-health` is disabled
"backtrace",
"contexts",
"debug-images",
"panic",
"transport",
# additional features
"anyhow",
"tracing",
"logs"
] }
sentry-kafka-schemas = "1.3.7"
sentry_arroyo = { version = "2.22.0", features = ["ssl"] }
sentry_protos = "0.3.0"
Expand All @@ -49,7 +60,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
serde_path_to_error = "0.1.15"
serde_with = "3.8.1"
statsdproxy = { version = "0.1.2", features = ["cadence-adapter", "sentry"] }
statsdproxy = { version = "0.4.1", features = ["cadence"] }
thiserror = "1.0"
tokio = { version = "1.38.2", features = ["full"] }
tokio-stream = "0.1.15"
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ fn run_processor_bench(

fn main() {
// this sends to nowhere, but because it's UDP we won't error.
metrics::init(StatsDBackend::new("127.0.0.1", 8081, "snuba.consumer", 0.0)).unwrap();
metrics::init(StatsDBackend::new("127.0.0.1", 8081, "snuba.consumer")).unwrap();

let mut c = Criterion::default().configure_from_args();

Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.83.0"
channel = "1.87.0"
2 changes: 0 additions & 2 deletions rust_snuba/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ pub struct EnvConfig {
pub lower_retention_days: u16,
pub valid_retention_days: HashSet<u16>,
pub record_cogs: bool,
pub ddm_metrics_sample_rate: f64,
pub project_stacktrace_blacklist: Vec<u64>,
}

Expand All @@ -122,7 +121,6 @@ impl Default for EnvConfig {
lower_retention_days: 30,
valid_retention_days: [30, 60, 90].iter().cloned().collect(),
record_cogs: false,
ddm_metrics_sample_rate: 0.0,
project_stacktrace_blacklist: Vec::new(),
}
}
Expand Down
16 changes: 2 additions & 14 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,7 @@ pub fn consumer_v2_impl(
set_global_tag("storage".to_owned(), storage_name);
set_global_tag("consumer_group".to_owned(), consumer_group.to_owned());

metrics::init(StatsDBackend::new(
&host,
port,
"snuba.consumer",
env_config.ddm_metrics_sample_rate,
))
.unwrap();
metrics::init(StatsDBackend::new(&host, port, "snuba.consumer")).unwrap();
}

if !use_rust_processor {
Expand Down Expand Up @@ -414,13 +408,7 @@ pub fn consumer_impl(
set_global_tag("storage".to_owned(), storage_name);
set_global_tag("consumer_group".to_owned(), consumer_group.to_owned());

metrics::init(StatsDBackend::new(
&host,
port,
"snuba.consumer",
env_config.ddm_metrics_sample_rate,
))
.unwrap();
metrics::init(StatsDBackend::new(&host, port, "snuba.consumer")).unwrap();
}

if !use_rust_processor {
Expand Down
12 changes: 7 additions & 5 deletions rust_snuba/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ pub fn setup_logging() {
.or_else(|_| EnvFilter::try_new("info"))
.unwrap();

// Capture errors & warnings as exceptions
// Capture errors & warnings as exceptions, and also send everything at or above INFO as logs
// instead of breadcrumbs.
let sentry_layer =
sentry::integrations::tracing::layer().event_filter(|metadata| match metadata.level() {
&Level::ERROR | &Level::WARN => EventFilter::Exception,
&Level::INFO => EventFilter::Breadcrumb,
&Level::DEBUG | &Level::TRACE => EventFilter::Ignore,
sentry::integrations::tracing::layer().event_filter(|metadata| match *metadata.level() {
Level::ERROR | Level::WARN => EventFilter::Event | EventFilter::Log,
Level::INFO => EventFilter::Log,
Level::DEBUG | Level::TRACE => EventFilter::Ignore,
});

tracing_subscriber::registry()
Expand All @@ -31,6 +32,7 @@ pub fn setup_sentry(sentry_dsn: &str) -> ClientInitGuard {
// the value for release is also computed in python snuba, please keep the
// logic in sync
release: std::env::var("SNUBA_RELEASE").ok().map(From::from),
enable_logs: true,
..Default::default()
},
))
Expand Down
36 changes: 13 additions & 23 deletions rust_snuba/src/metrics/statsd.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::time::Duration;

use sentry_arroyo::metrics::{Metric, MetricSink, Recorder, StatsdRecorder};
use statsdproxy::cadence::StatsdProxyMetricSink;
use statsdproxy::config::{AggregateMetricsConfig, SampleConfig};
use statsdproxy::config::AggregateMetricsConfig;
use statsdproxy::middleware::aggregate::AggregateMetrics;
use statsdproxy::middleware::mirror::Mirror;
use statsdproxy::middleware::sample::Sample;
use statsdproxy::middleware::sentry::Sentry;
use statsdproxy::middleware::upstream::Upstream;

use crate::metrics::global_tags::AddGlobalTags;
Expand All @@ -29,33 +28,24 @@ impl MetricSink for Wrapper {
}

impl StatsDBackend {
pub fn new(host: &str, port: u16, prefix: &str, ddm_metrics_sample_rate: f64) -> Self {
pub fn new(host: &str, port: u16, prefix: &str) -> Self {
let upstream_addr = format!("{}:{}", host, port);
let aggregator_sink = StatsdProxyMetricSink::new(move || {
let next_step = Upstream::new(upstream_addr.clone()).unwrap();

let next_step_sentry = Sample::new(
SampleConfig {
sample_rate: ddm_metrics_sample_rate,
},
Sentry::new(),
);

let next_step = Mirror::new(next_step, next_step_sentry);

// adding global tags *after* aggregation is more performant than trying to do the same
// in cadence, as it means more bytes and more memory to deal with in
// AggregateMetricsConfig
let next_step = AddGlobalTags::new(next_step);
let upstream = Upstream::new(upstream_addr.clone()).unwrap();

let config = AggregateMetricsConfig {
aggregate_counters: true,
flush_offset: 0,
flush_interval: 1,
flush_interval: Duration::from_secs(1),
aggregate_gauges: true,
max_map_size: None,
};
AggregateMetrics::new(config, next_step)
let aggregate = AggregateMetrics::new(config, upstream);

// adding global tags *after* aggregation is more performant than trying to do the same
// in cadence, as it means more bytes and more memory to deal with in
// AggregateMetricsConfig
AddGlobalTags::new(aggregate)
});

let recorder = StatsdRecorder::new(prefix, Wrapper(Box::new(aggregator_sink)));
Expand All @@ -72,7 +62,7 @@ mod tests {

#[test]
fn statsd_metric_backend() {
let backend = StatsDBackend::new("0.0.0.0", 8125, "test", 0.0);
let backend = StatsDBackend::new("0.0.0.0", 8125, "test");

backend.record_metric(metric!(Counter: "a", 1, "tag1" => "value1"));
backend.record_metric(metric!(Gauge: "b", 20, "tag2" => "value2"));
Expand Down
Loading