Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions crates/datadog-trace-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use libdd_trace_utils::config_utils::{
};
use libdd_trace_utils::trace_utils;

const DEFAULT_APM_RECEIVER_PORT: u16 = 8126;
const DEFAULT_DOGSTATSD_PORT: u16 = 8125;

#[derive(Debug)]
Expand Down Expand Up @@ -73,6 +74,8 @@ impl Tags {
#[derive(Debug)]
pub struct Config {
pub dd_site: String,
pub dd_apm_receiver_port: u16,
pub dd_apm_windows_pipe_name: Option<String>,
pub dd_dogstatsd_port: u16,
pub env_type: trace_utils::EnvironmentType,
pub app_name: Option<String>,
Expand Down Expand Up @@ -101,6 +104,16 @@ impl Config {
anyhow::anyhow!("Unable to identify environment. Shutting down Mini Agent.")
})?;

let dd_apm_windows_pipe_name: Option<String> = env::var("DD_APM_WINDOWS_PIPE_NAME").ok();
let dd_apm_receiver_port: u16 = if dd_apm_windows_pipe_name.is_some() {
0 // Override to 0 when using Windows named pipe
} else {
env::var("DD_APM_RECEIVER_PORT")
.ok()
.and_then(|port| port.parse::<u16>().ok())
.unwrap_or(DEFAULT_APM_RECEIVER_PORT)
};

let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT")
.ok()
.and_then(|port| port.parse::<u16>().ok())
Expand Down Expand Up @@ -140,6 +153,8 @@ impl Config {
trace_flush_interval: 3,
stats_flush_interval: 3,
verify_env_timeout: 100,
dd_apm_receiver_port,
dd_apm_windows_pipe_name,
dd_dogstatsd_port,
dd_site,
trace_intake: Endpoint {
Expand Down Expand Up @@ -315,6 +330,55 @@ mod tests {
env::remove_var("DD_DOGSTATSD_PORT");
}

#[test]
#[serial]
fn test_apm_windows_pipe_name() {
env::set_var("DD_API_KEY", "_not_a_real_key_");
env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app");
env::set_var("DD_APM_WINDOWS_PIPE_NAME", r"\\.\pipe\trace-agent");
let config_res = config::Config::new();
assert!(config_res.is_ok());
let config = config_res.unwrap();
assert_eq!(
config.dd_apm_windows_pipe_name,
Some(r"\\.\pipe\trace-agent".to_string())
);
// Port should be overridden to 0 when pipe is set
assert_eq!(config.dd_apm_receiver_port, 0);
env::remove_var("DD_API_KEY");
env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME");
env::remove_var("DD_APM_WINDOWS_PIPE_NAME");
}

#[test]
#[serial]
fn test_default_apm_receiver_port() {
env::set_var("DD_API_KEY", "_not_a_real_key_");
env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app");
let config_res = config::Config::new();
assert!(config_res.is_ok());
let config = config_res.unwrap();
assert_eq!(config.dd_apm_receiver_port, 8126);
assert_eq!(config.dd_apm_windows_pipe_name, None);
env::remove_var("DD_API_KEY");
env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME");
}

#[test]
#[serial]
fn test_custom_apm_receiver_port() {
env::set_var("DD_API_KEY", "_not_a_real_key_");
env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app");
env::set_var("DD_APM_RECEIVER_PORT", "18126");
let config_res = config::Config::new();
assert!(config_res.is_ok());
let config = config_res.unwrap();
assert_eq!(config.dd_apm_receiver_port, 18126);
env::remove_var("DD_API_KEY");
env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME");
env::remove_var("DD_APM_RECEIVER_PORT");
}

fn test_config_with_dd_tags(dd_tags: &str) -> config::Config {
env::set_var("DD_API_KEY", "_not_a_real_key_");
env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app");
Expand Down
199 changes: 186 additions & 13 deletions crates/datadog-trace-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use std::time::Instant;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{debug, error};

#[cfg(windows)]
use tokio::net::windows::named_pipe::ServerOptions;

use crate::http_utils::log_and_create_http_response;
use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor};
use libdd_trace_protobuf::pb;
use libdd_trace_utils::trace_utils;
use libdd_trace_utils::trace_utils::SendData;

const MINI_AGENT_PORT: usize = 8126;
const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces";
const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
const INFO_ENDPOINT_PATH: &str = "/info";
Expand Down Expand Up @@ -63,7 +65,7 @@ impl MiniAgent {
// start our trace flusher. receives trace payloads and handles buffering + deciding when to
// flush to backend.
let trace_flusher = self.trace_flusher.clone();
tokio::spawn(async move {
let trace_flusher_handle = tokio::spawn(async move {
trace_flusher.start_trace_flusher(trace_rx).await;
});

Expand All @@ -76,7 +78,7 @@ impl MiniAgent {
// start our stats flusher.
let stats_flusher = self.stats_flusher.clone();
let stats_config = self.config.clone();
tokio::spawn(async move {
let stats_flusher_handle = tokio::spawn(async move {
stats_flusher
.start_stats_flusher(stats_config, stats_rx)
.await;
Expand Down Expand Up @@ -107,16 +109,77 @@ impl MiniAgent {
)
});

let addr = SocketAddr::from(([127, 0, 0, 1], MINI_AGENT_PORT as u16));
let listener = tokio::net::TcpListener::bind(&addr).await?;

debug!("Mini Agent started: listening on port {MINI_AGENT_PORT}");
// Determine which transport to use based on configuration
if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name {
debug!("Mini Agent started: listening on named pipe {}", pipe_name);
} else {
debug!(
"Mini Agent started: listening on port {}",
self.config.dd_apm_receiver_port
);
}
debug!(
"Time taken start the Mini Agent: {} ms",
"Time taken to start the Mini Agent: {} ms",
now.elapsed().as_millis()
);

if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name {
// Windows named pipe transport
#[cfg(windows)]
{
Self::serve_named_pipe(
pipe_name,
service,
trace_flusher_handle,
stats_flusher_handle,
)
.await?;
}

#[cfg(not(windows))]
{
error!(
"Named pipes are only supported on Windows, cannot use pipe: {}",
pipe_name
);
return Err("Named pipes are only supported on Windows".into());
}
} else {
// TCP transport
let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port));
let listener = tokio::net::TcpListener::bind(&addr).await?;

Self::serve_tcp(
listener,
service,
trace_flusher_handle,
stats_flusher_handle,
)
.await?;
}

Ok(())
}

async fn serve_tcp<S>(
listener: tokio::net::TcpListener,
service: S,
mut trace_flusher_handle: tokio::task::JoinHandle<()>,
mut stats_flusher_handle: tokio::task::JoinHandle<()>,
) -> Result<(), Box<dyn std::error::Error>>
where
S: hyper::service::Service<
hyper::Request<hyper::body::Incoming>,
Response = hyper::Response<hyper_migration::Body>,
> + Clone
+ Send
+ 'static,
S::Future: Send,
S::Error: std::error::Error + Send + Sync + 'static,
{
let server = hyper::server::conn::http1::Builder::new();
let mut joinset = tokio::task::JoinSet::new();

loop {
let conn = tokio::select! {
con_res = listener.accept() => match con_res {
Expand Down Expand Up @@ -147,6 +210,15 @@ impl MiniAgent {
},
Ok(()) | Err(_) => continue,
},
// If there's some error in the background tasks, we can't send data
result = &mut trace_flusher_handle => {
error!("Trace flusher task died: {:?}", result);
return Err("Trace flusher task terminated unexpectedly".into());
},
result = &mut stats_flusher_handle => {
error!("Stats flusher task died: {:?}", result);
return Err("Stats flusher task terminated unexpectedly".into());
},
};
let conn = hyper_util::rt::TokioIo::new(conn);
let server = server.clone();
Expand All @@ -159,6 +231,95 @@ impl MiniAgent {
}
}

#[cfg(windows)]
async fn serve_named_pipe<S>(
pipe_name: &str,
service: S,
mut trace_flusher_handle: tokio::task::JoinHandle<()>,
mut stats_flusher_handle: tokio::task::JoinHandle<()>,
) -> Result<(), Box<dyn std::error::Error>>
where
S: hyper::service::Service<
hyper::Request<hyper::body::Incoming>,
Response = hyper::Response<hyper_migration::Body>,
> + Clone
+ Send
+ 'static,
S::Future: Send,
S::Error: std::error::Error + Send + Sync + 'static,
{
let server = hyper::server::conn::http1::Builder::new();
let mut joinset = tokio::task::JoinSet::new();

loop {
// Create a new pipe instance
let pipe = match ServerOptions::new().create(pipe_name) {
Ok(pipe) => {
debug!("Created pipe server instance '{}' in byte mode", pipe_name);
pipe
}
Err(e) => {
error!("Failed to create named pipe: {e}");
return Err(e.into());
}
};

// Wait for client connection
let conn = tokio::select! {
connect_res = pipe.connect() => match connect_res {
Err(e)
if matches!(
e.kind(),
io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionReset
| io::ErrorKind::ConnectionRefused
) =>
{
continue;
}
Err(e) => {
error!("Named pipe connection error: {e}");
return Err(e.into());
}
Ok(()) => {
debug!("Client connected to '{}'", pipe_name);
pipe
}
},
finished = async {
match joinset.join_next().await {
Some(finished) => finished,
None => std::future::pending().await,
}
} => match finished {
Err(e) if e.is_panic() => {
std::panic::resume_unwind(e.into_panic());
},
Ok(()) | Err(_) => continue,
},
// If there's some error in the background tasks, we can't send data
result = &mut trace_flusher_handle => {
error!("Trace flusher task died: {:?}", result);
return Err("Trace flusher task terminated unexpectedly".into());
},
result = &mut stats_flusher_handle => {
error!("Stats flusher task died: {:?}", result);
return Err("Stats flusher task terminated unexpectedly".into());
},
};

// Hyper http parser handles buffering pipe data
let conn = hyper_util::rt::TokioIo::new(conn);
let server = server.clone();
let service = service.clone();
joinset.spawn(async move {
if let Err(e) = server.serve_connection(conn, service).await {
error!("Connection error: {e}");
}
});
}
}

async fn trace_endpoint_handler(
config: Arc<config::Config>,
req: hyper_migration::HttpRequest,
Expand Down Expand Up @@ -190,7 +351,11 @@ impl MiniAgent {
),
}
}
(_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) {
(_, INFO_ENDPOINT_PATH) => match Self::info_handler(
config.dd_apm_receiver_port,
config.dd_apm_windows_pipe_name.as_deref(),
config.dd_dogstatsd_port,
) {
Ok(res) => Ok(res),
Err(err) => log_and_create_http_response(
&format!("Info endpoint error: {err}"),
Expand All @@ -205,7 +370,17 @@ impl MiniAgent {
}
}

fn info_handler(dd_dogstatsd_port: u16) -> http::Result<hyper_migration::HttpResponse> {
fn info_handler(
dd_apm_receiver_port: u16,
dd_apm_windows_pipe_name: Option<&str>,
dd_dogstatsd_port: u16,
) -> http::Result<hyper_migration::HttpResponse> {
let config_json = serde_json::json!({
"receiver_port": dd_apm_receiver_port,
"statsd_port": dd_dogstatsd_port,
"receiver_socket": serde_json::json!(dd_apm_windows_pipe_name.unwrap_or(""))
});

let response_json = json!(
{
"endpoints": [
Expand All @@ -214,9 +389,7 @@ impl MiniAgent {
INFO_ENDPOINT_PATH
],
"client_drop_p0s": true,
"config": {
"statsd_port": dd_dogstatsd_port
}
"config": config_json
}
);
Response::builder()
Expand Down
2 changes: 2 additions & 0 deletions crates/datadog-trace-agent/src/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ mod tests {
..Default::default()
},
dd_site: "datadoghq.com".to_string(),
dd_apm_receiver_port: 8126,
dd_apm_windows_pipe_name: None,
dd_dogstatsd_port: 8125,
env_type: trace_utils::EnvironmentType::CloudFunction,
os: "linux".to_string(),
Expand Down
Loading
Loading