Skip to content

Commit 1d378cf

Browse files
committed
Add integration tests for datagod-trace-agent miniagent connections
1 parent d5dbcce commit 1d378cf

File tree

4 files changed

+290
-289
lines changed

4 files changed

+290
-289
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Helper functions for integration tests
5+
6+
use hyper::{Request, Response};
7+
use hyper_util::rt::TokioIo;
8+
use libdd_common::hyper_migration;
9+
use libdd_trace_utils::test_utils::create_test_json_span;
10+
use std::time::{Duration, UNIX_EPOCH};
11+
use tokio::time::timeout;
12+
13+
/// Create a simple test trace payload as msgpack bytes
14+
pub fn create_test_trace_payload() -> Vec<u8> {
15+
let start = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64;
16+
let json_span = create_test_json_span(11, 222, 0, start, false);
17+
rmp_serde::to_vec(&vec![vec![json_span]]).expect("Failed to serialize test trace")
18+
}
19+
20+
/// Send an HTTP request over TCP and return the response
21+
pub async fn send_tcp_request(
22+
port: u16,
23+
uri: &str,
24+
method: &str,
25+
body: Option<Vec<u8>>,
26+
) -> Result<Response<hyper::body::Incoming>, Box<dyn std::error::Error>> {
27+
let stream = timeout(
28+
Duration::from_secs(2),
29+
tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port)),
30+
)
31+
.await??;
32+
33+
let io = TokioIo::new(stream);
34+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
35+
36+
tokio::spawn(async move {
37+
let _ = conn.await;
38+
});
39+
40+
let mut request_builder = Request::builder()
41+
.uri(uri)
42+
.method(method)
43+
.header("Content-Type", "application/msgpack");
44+
45+
let response = if let Some(body_data) = body {
46+
let body_len = body_data.len();
47+
request_builder = request_builder.header("Content-Length", body_len.to_string());
48+
let request = request_builder.body(hyper_migration::Body::from(body_data))?;
49+
timeout(Duration::from_secs(2), sender.send_request(request)).await??
50+
} else {
51+
let request = request_builder.body(hyper_migration::Body::empty())?;
52+
timeout(Duration::from_secs(2), sender.send_request(request)).await??
53+
};
54+
55+
Ok(response)
56+
}
57+
58+
#[cfg(windows)]
59+
/// Send an HTTP request over named pipe and return the response
60+
pub async fn send_named_pipe_request(
61+
pipe_name: &str,
62+
uri: &str,
63+
method: &str,
64+
body: Option<Vec<u8>>,
65+
) -> Result<Response<hyper::body::Incoming>, Box<dyn std::error::Error>> {
66+
use tokio::net::windows::named_pipe::ClientOptions;
67+
68+
let client = timeout(
69+
Duration::from_secs(2),
70+
ClientOptions::new().open(pipe_name),
71+
)
72+
.await??;
73+
74+
let io = TokioIo::new(client);
75+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
76+
77+
tokio::spawn(async move {
78+
let _ = conn.await;
79+
});
80+
81+
let mut request_builder = Request::builder()
82+
.uri(uri)
83+
.method(method)
84+
.header("Content-Type", "application/msgpack");
85+
86+
let response = if let Some(body_data) = body {
87+
let body_len = body_data.len();
88+
request_builder = request_builder.header("Content-Length", body_len.to_string());
89+
let request = request_builder.body(hyper_migration::Body::from(body_data))?;
90+
timeout(Duration::from_secs(2), sender.send_request(request)).await??
91+
} else {
92+
let request = request_builder.body(hyper_migration::Body::empty())?;
93+
timeout(Duration::from_secs(2), sender.send_request(request)).await??
94+
};
95+
96+
Ok(response)
97+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Mock implementations of trace agent components for testing
5+
6+
use datadog_trace_agent::{
7+
config::Config,
8+
env_verifier::EnvVerifier,
9+
stats_flusher::StatsFlusher,
10+
stats_processor::StatsProcessor,
11+
trace_flusher::TraceFlusher,
12+
trace_processor::TraceProcessor,
13+
};
14+
use libdd_common::hyper_migration;
15+
use libdd_trace_protobuf::pb;
16+
use libdd_trace_utils::trace_utils::{self, MiniAgentMetadata, SendData};
17+
use std::sync::Arc;
18+
use tokio::sync::mpsc::{Receiver, Sender};
19+
20+
/// Mock trace processor that returns 200 OK for all requests
21+
#[allow(dead_code)]
22+
pub struct MockTraceProcessor;
23+
24+
#[async_trait::async_trait]
25+
impl TraceProcessor for MockTraceProcessor {
26+
async fn process_traces(
27+
&self,
28+
_config: Arc<Config>,
29+
_req: hyper_migration::HttpRequest,
30+
_trace_tx: Sender<SendData>,
31+
_mini_agent_metadata: Arc<MiniAgentMetadata>,
32+
) -> Result<hyper_migration::HttpResponse, hyper::http::Error> {
33+
hyper::Response::builder()
34+
.status(200)
35+
.body(hyper_migration::Body::from("{}"))
36+
}
37+
}
38+
39+
/// Mock trace flusher that consumes messages without processing
40+
pub struct MockTraceFlusher;
41+
42+
#[async_trait::async_trait]
43+
impl TraceFlusher for MockTraceFlusher {
44+
fn new(
45+
_aggregator: Arc<tokio::sync::Mutex<datadog_trace_agent::aggregator::TraceAggregator>>,
46+
_config: Arc<Config>,
47+
) -> Self {
48+
MockTraceFlusher
49+
}
50+
51+
async fn start_trace_flusher(&self, mut trace_rx: Receiver<SendData>) {
52+
// Consume messages from the channel without processing them
53+
while let Some(_trace) = trace_rx.recv().await {
54+
// Just discard the trace - we're not testing the flusher
55+
}
56+
}
57+
58+
async fn send(&self, _traces: Vec<SendData>) -> Option<Vec<SendData>> {
59+
None
60+
}
61+
62+
async fn flush(&self, _failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>> {
63+
None
64+
}
65+
}
66+
67+
/// Mock stats processor that returns 200 OK for all requests
68+
pub struct MockStatsProcessor;
69+
70+
#[async_trait::async_trait]
71+
impl StatsProcessor for MockStatsProcessor {
72+
async fn process_stats(
73+
&self,
74+
_config: Arc<Config>,
75+
_req: hyper_migration::HttpRequest,
76+
_stats_tx: Sender<pb::ClientStatsPayload>,
77+
) -> Result<hyper_migration::HttpResponse, hyper::http::Error> {
78+
hyper::Response::builder()
79+
.status(200)
80+
.body(hyper_migration::Body::from("{}"))
81+
}
82+
}
83+
84+
/// Mock stats flusher that consumes messages without processing
85+
pub struct MockStatsFlusher;
86+
87+
#[async_trait::async_trait]
88+
impl StatsFlusher for MockStatsFlusher {
89+
async fn start_stats_flusher(
90+
&self,
91+
_config: Arc<Config>,
92+
mut stats_rx: Receiver<pb::ClientStatsPayload>,
93+
) {
94+
// Consume messages from the channel without processing them
95+
while let Some(_stats) = stats_rx.recv().await {
96+
// Just discard the stats - we're not testing the flusher
97+
}
98+
}
99+
100+
async fn flush_stats(&self, _config: Arc<Config>, _traces: Vec<pb::ClientStatsPayload>) {
101+
// Do nothing
102+
}
103+
}
104+
105+
/// Mock environment verifier that returns default metadata
106+
pub struct MockEnvVerifier;
107+
108+
#[async_trait::async_trait]
109+
impl EnvVerifier for MockEnvVerifier {
110+
async fn verify_environment(
111+
&self,
112+
_timeout_ms: u64,
113+
_env_type: &trace_utils::EnvironmentType,
114+
_os: &str,
115+
) -> MiniAgentMetadata {
116+
MiniAgentMetadata::default()
117+
}
118+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Common test utilities, mocks, and helpers for integration tests
5+
6+
pub mod helpers;
7+
pub mod mocks;

0 commit comments

Comments
 (0)