Skip to content

Commit 527d3bc

Browse files
authored
enhancement(correctness): add support for checking APM stats in "trace" analysis mode (#1075)
1 parent 40e71d4 commit 527d3bc

File tree

25 files changed

+734
-57
lines changed

25 files changed

+734
-57
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ tracing-appender = { version = "0.2", default-features = false }
204204
base64 = { version = "0.22.1", default-features = false }
205205
treediff = { version = "5", default-features = false }
206206
argh = { version = "0.1", default-features = false }
207+
rmp-serde = { version = "1.3", default-features = false }
207208

208209
[patch.crates-io]
209210
# Forked version of `hyper-http-proxy` that removes an unused dependency on `rustls-native-certs`, which transitively depends

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ ifeq ($(CI),true)
3232
endif
3333
export CARGO_TOOL_VERSION_cargo-binstall ?= 1.16.2
3434
export CARGO_TOOL_VERSION_dd-rust-license-tool ?= 1.0.3
35-
export CARGO_TOOL_VERSION_cargo-deny ?= 0.18.3
35+
export CARGO_TOOL_VERSION_cargo-deny ?= 0.18.9
3636
export CARGO_TOOL_VERSION_cargo-hack ?= 0.6.30
3737
export CARGO_TOOL_VERSION_cargo-nextest ?= 0.9.99
3838
export CARGO_TOOL_VERSION_cargo-autoinherit ?= 0.1.5

bin/correctness/datadog-intake/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ repository = { workspace = true }
99
workspace = true
1010

1111
[dependencies]
12-
axum = { workspace = true, features = ["http1", "json", "tokio"] }
12+
axum = { workspace = true, features = ["http1", "json", "tokio", "tracing"] }
1313
datadog-protos = { workspace = true }
1414
protobuf = { workspace = true }
15+
rmp-serde = { workspace = true }
1516
saluki-error = { workspace = true }
1617
stele = { workspace = true }
1718
tokio = { workspace = true, features = [
@@ -23,6 +24,7 @@ tokio = { workspace = true, features = [
2324
tower-http = { workspace = true, features = [
2425
"compression-zstd",
2526
"decompression-deflate",
27+
"decompression-gzip",
2628
"decompression-zstd",
2729
] }
2830
tracing = { workspace = true }

bin/correctness/datadog-intake/src/app/metrics/handlers.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ use axum::{body::Bytes, extract::State, http::StatusCode, Json};
22
use datadog_protos::metrics::{MetricPayload, SketchPayload};
33
use protobuf::Message as _;
44
use stele::Metric;
5-
use tracing::{debug, error};
5+
use tracing::{error, info};
66

77
use super::MetricsState;
88

99
pub async fn handle_metrics_dump(State(state): State<MetricsState>) -> Json<Vec<Metric>> {
10+
info!("Got request to dump metrics.");
1011
Json(state.dump_metrics())
1112
}
1213

1314
pub async fn handle_series_v2(State(state): State<MetricsState>, body: Bytes) -> StatusCode {
14-
debug!("Received series payload.");
15+
info!("Received series payload.");
1516

1617
let payload = match MetricPayload::parse_from_bytes(&body[..]) {
1718
Ok(payload) => payload,
@@ -23,7 +24,7 @@ pub async fn handle_series_v2(State(state): State<MetricsState>, body: Bytes) ->
2324

2425
match state.merge_series_payload(payload) {
2526
Ok(()) => {
26-
debug!("Processed series payload.");
27+
info!("Processed series payload.");
2728
StatusCode::ACCEPTED
2829
}
2930
Err(e) => {
@@ -34,7 +35,7 @@ pub async fn handle_series_v2(State(state): State<MetricsState>, body: Bytes) ->
3435
}
3536

3637
pub async fn handle_sketch_beta(State(state): State<MetricsState>, body: Bytes) -> StatusCode {
37-
debug!("Received sketch payload.");
38+
info!("Received sketch payload.");
3839

3940
let payload = match SketchPayload::parse_from_bytes(&body[..]) {
4041
Ok(payload) => payload,
@@ -46,7 +47,7 @@ pub async fn handle_sketch_beta(State(state): State<MetricsState>, body: Bytes)
4647

4748
match state.merge_sketch_payload(payload) {
4849
Ok(()) => {
49-
debug!("Processed sketch payload.");
50+
info!("Processed sketch payload.");
5051
StatusCode::ACCEPTED
5152
}
5253
Err(e) => {

bin/correctness/datadog-intake/src/app/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub fn initialize_app_router() -> Router {
1717
.merge(misc::build_misc_router())
1818
.fallback(debug_fallback_handler)
1919
// Ensure we can handle compressed requests.
20-
.route_layer(RequestDecompressionLayer::new().deflate(true).zstd(true))
20+
.route_layer(RequestDecompressionLayer::new().deflate(true).gzip(true).zstd(true))
2121
.route_layer(CompressionLayer::new().zstd(true))
2222
// Decompressed metrics payloads can be large (~62MB for sketches).
2323
.route_layer(DefaultBodyLimit::max(64 * 1024 * 1024))
Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
use axum::{body::Bytes, extract::State, http::StatusCode, Json};
2-
use datadog_protos::traces::AgentPayload;
2+
use datadog_protos::traces::{AgentPayload, StatsPayload};
33
use protobuf::Message as _;
4-
use stele::Span;
5-
use tracing::{debug, error};
4+
use stele::{ClientStatisticsAggregator, Span};
5+
use tracing::{error, info};
66

77
use super::TracesState;
88

9-
pub async fn handle_traces_dump(State(state): State<TracesState>) -> Json<Vec<Span>> {
10-
Json(state.dump())
9+
pub async fn handle_trace_spans_dump(State(state): State<TracesState>) -> Json<Vec<Span>> {
10+
info!("Got request to dump trace spans.");
11+
Json(state.dump_spans())
12+
}
13+
14+
pub async fn handle_trace_stats_dump(State(state): State<TracesState>) -> Json<ClientStatisticsAggregator> {
15+
info!("Got request to dump trace stats.");
16+
Json(state.dump_stats())
1117
}
1218

1319
pub async fn handle_v02_traces(State(state): State<TracesState>, body: Bytes) -> StatusCode {
14-
debug!("Received v0.2 traces payload.");
20+
info!("Received v0.2 traces payload.");
1521

1622
let payload = match AgentPayload::parse_from_bytes(&body[..]) {
1723
Ok(payload) => payload,
@@ -23,7 +29,7 @@ pub async fn handle_v02_traces(State(state): State<TracesState>, body: Bytes) ->
2329

2430
match state.merge_agent_payload(payload) {
2531
Ok(()) => {
26-
debug!("Processed trace payload.");
32+
info!("Processed trace payload.");
2733
StatusCode::ACCEPTED
2834
}
2935
Err(e) => {
@@ -32,3 +38,26 @@ pub async fn handle_v02_traces(State(state): State<TracesState>, body: Bytes) ->
3238
}
3339
}
3440
}
41+
42+
pub async fn handle_v02_stats(State(state): State<TracesState>, body: Bytes) -> StatusCode {
43+
info!("Received v0.2 stats payload.");
44+
45+
let payload = match rmp_serde::from_slice::<StatsPayload>(&body[..]) {
46+
Ok(payload) => payload,
47+
Err(e) => {
48+
error!(error = ?e, "Failed to parse stats payload.");
49+
return StatusCode::BAD_REQUEST;
50+
}
51+
};
52+
53+
match state.merge_stats_payload(payload) {
54+
Ok(()) => {
55+
info!("Processed stats payload.");
56+
StatusCode::ACCEPTED
57+
}
58+
Err(e) => {
59+
error!(error = %e, "Failed to merge stats payload.");
60+
StatusCode::BAD_REQUEST
61+
}
62+
}
63+
}

bin/correctness/datadog-intake/src/app/traces/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use self::state::TracesState;
1111

1212
pub fn build_traces_router() -> Router {
1313
Router::new()
14-
.route("/traces/dump", get(handle_traces_dump))
14+
.route("/traces/dump_spans", get(handle_trace_spans_dump))
15+
.route("/traces/dump_stats", get(handle_trace_stats_dump))
1516
.route("/api/v0.2/traces", post(handle_v02_traces))
17+
.route("/api/v0.2/stats", post(handle_v02_stats))
1618
.with_state(TracesState::new())
1719
}
Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,52 @@
11
use std::sync::{Arc, Mutex};
22

3-
use datadog_protos::traces::AgentPayload;
3+
use datadog_protos::traces::{AgentPayload, StatsPayload};
44
use saluki_error::GenericError;
5-
use stele::Span;
5+
use stele::{ClientStatisticsAggregator, Span};
6+
7+
#[derive(Default)]
8+
struct Inner {
9+
spans: Vec<Span>,
10+
stats: ClientStatisticsAggregator,
11+
}
612

713
#[derive(Clone)]
814
pub struct TracesState {
9-
spans: Arc<Mutex<Vec<Span>>>,
15+
inner: Arc<Mutex<Inner>>,
1016
}
1117

1218
impl TracesState {
1319
/// Creates a new `TracesState`.
1420
pub fn new() -> Self {
1521
Self {
16-
spans: Arc::new(Mutex::new(Vec::new())),
22+
inner: Arc::new(Mutex::new(Inner::default())),
1723
}
1824
}
1925

20-
/// Dumps the current traces state.
21-
pub fn dump(&self) -> Vec<Span> {
22-
let data = self.spans.lock().unwrap();
23-
data.clone()
26+
/// Dumps the current set of spans.
27+
pub fn dump_spans(&self) -> Vec<Span> {
28+
let inner = self.inner.lock().unwrap();
29+
inner.spans.clone()
30+
}
31+
32+
/// Dumps the current stats.
33+
pub fn dump_stats(&self) -> ClientStatisticsAggregator {
34+
let inner = self.inner.lock().unwrap();
35+
inner.stats.clone()
2436
}
2537

2638
/// Merges the given agent payload into the current traces state.
2739
pub fn merge_agent_payload(&self, payload: AgentPayload) -> Result<(), GenericError> {
2840
let new_spans = Span::get_spans_from_agent_payload(&payload);
29-
let mut data = self.spans.lock().unwrap();
30-
data.extend(new_spans);
41+
let mut inner = self.inner.lock().unwrap();
42+
inner.spans.extend(new_spans);
3143

3244
Ok(())
3345
}
46+
47+
/// Merges the given stats payload into the current traces state.
48+
pub fn merge_stats_payload(&self, payload: StatsPayload) -> Result<(), GenericError> {
49+
let mut inner = self.inner.lock().unwrap();
50+
inner.stats.merge_payload(&payload)
51+
}
3452
}

bin/correctness/datadog-intake/src/main.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ async fn main() {
2727
)
2828
.with_ansi(true)
2929
.with_target(true)
30-
.with_file(true)
31-
.with_line_number(true)
3230
.init();
3331

3432
match run().await {

0 commit comments

Comments
 (0)