diff --git a/bin_tests/tests/crashtracker_bin_test.rs b/bin_tests/tests/crashtracker_bin_test.rs index f16320157..afe3c7613 100644 --- a/bin_tests/tests/crashtracker_bin_test.rs +++ b/bin_tests/tests/crashtracker_bin_test.rs @@ -135,6 +135,18 @@ fn test_crash_ping_timing_and_content() { test_crash_tracking_bin(BuildProfile::Release, "donothing", "null_deref"); } +#[test] +#[cfg_attr(miri, ignore)] +fn test_crash_tracking_errors_intake_upload() { + test_crash_tracking_bin_with_errors_intake(BuildProfile::Release, "donothing", "null_deref"); +} + +#[test] +#[cfg_attr(miri, ignore)] +fn test_crash_tracking_errors_intake_crash_ping() { + test_crash_tracking_errors_intake_dual_upload(BuildProfile::Release, "donothing", "null_deref"); +} + // This test is disabled for now on x86_64 musl and macos // It seems that on aarch64 musl, libc has CFI which allows // unwinding passed the signal frame. @@ -700,6 +712,214 @@ fn setup_crashtracking_crates( (crashtracker_bin, crashtracker_receiver) } +fn test_crash_tracking_bin_with_errors_intake( + crash_tracking_receiver_profile: BuildProfile, + mode: &str, + crash_typ: &str, +) { + let (crashtracker_bin, crashtracker_receiver) = + setup_crashtracking_crates(crash_tracking_receiver_profile); + let fixtures = setup_test_fixtures(&[&crashtracker_receiver, &crashtracker_bin]); + + let mut p = process::Command::new(&fixtures.artifacts[&crashtracker_bin]) + .arg(format!("file://{}", fixtures.crash_profile_path.display())) + .arg(fixtures.artifacts[&crashtracker_receiver].as_os_str()) + .arg(&fixtures.output_dir) + .arg(mode) + .arg(crash_typ) + .spawn() + .unwrap(); + + let exit_status = bin_tests::timeit!("exit after signal", { + eprintln!("Waiting for exit"); + p.wait().unwrap() + }); + + match crash_typ { + "kill_sigabrt" | "kill_sigill" | "null_deref" | "raise_sigabrt" | "raise_sigill" => { + assert!(!exit_status.success()) + } + "kill_sigbus" | "kill_sigsegv" | "raise_sigbus" | "raise_sigsegv" => { + assert!(exit_status.success()) + } + _ => unreachable!("{crash_typ} shouldn't happen"), + } + + // Check that errors intake file was created + let errors_intake_path = fixtures.crash_profile_path.with_extension("errors"); + assert!( + errors_intake_path.exists(), + "Errors intake file should be created at {}", + errors_intake_path.display() + ); + + // Read and validate errors intake payload + let errors_intake_content = fs::read(&errors_intake_path) + .context("reading errors intake payload") + .unwrap(); + let errors_payload = serde_json::from_slice::(&errors_intake_content) + .context("deserializing errors intake payload to json") + .unwrap(); + + // Validate errors intake payload structure + assert_errors_intake_payload(&errors_payload, crash_typ); + + // Also validate telemetry still works (dual upload) + let crash_telemetry = fs::read(&fixtures.crash_telemetry_path) + .context("reading crashtracker telemetry payload") + .unwrap(); + assert_telemetry_message(&crash_telemetry, crash_typ); +} + +fn test_crash_tracking_errors_intake_dual_upload( + crash_tracking_receiver_profile: BuildProfile, + mode: &str, + crash_typ: &str, +) { + let (crashtracker_bin, crashtracker_receiver) = + setup_crashtracking_crates(crash_tracking_receiver_profile); + let fixtures = setup_test_fixtures(&[&crashtracker_receiver, &crashtracker_bin]); + + let mut p = process::Command::new(&fixtures.artifacts[&crashtracker_bin]) + .arg(format!("file://{}", fixtures.crash_profile_path.display())) + .arg(fixtures.artifacts[&crashtracker_receiver].as_os_str()) + .arg(&fixtures.output_dir) + .arg(mode) + .arg(crash_typ) + .spawn() + .unwrap(); + + let exit_status = bin_tests::timeit!("exit after signal", { + eprintln!("Waiting for exit"); + p.wait().unwrap() + }); + + match crash_typ { + "kill_sigabrt" | "kill_sigill" | "null_deref" | "raise_sigabrt" | "raise_sigill" => { + assert!(!exit_status.success()) + } + "kill_sigbus" | "kill_sigsegv" | "raise_sigbus" | "raise_sigsegv" => { + assert!(exit_status.success()) + } + _ => unreachable!("{crash_typ} shouldn't happen"), + } + + // Check that errors intake file was created + let errors_intake_path = fixtures.crash_profile_path.with_extension("errors"); + assert!( + errors_intake_path.exists(), + "Errors intake file should be created at {}", + errors_intake_path.display() + ); + + // Read and validate errors intake payload + let errors_intake_content = fs::read(&errors_intake_path) + .context("reading errors intake payload") + .unwrap(); + + // The errors intake might contain multiple JSON objects (crash ping + crash report) + // Try to parse as a single JSON first, if that fails, try line by line + if let Ok(single_payload) = serde_json::from_slice::(&errors_intake_content) + { + // Single JSON payload - validate it + assert_errors_intake_payload(&single_payload, crash_typ); + } else { + // Multiple JSON objects - parse line by line + let content_str = String::from_utf8(errors_intake_content).unwrap(); + let lines: Vec<&str> = content_str.lines().collect(); + assert!(!lines.is_empty(), "Errors intake file should not be empty"); + + let mut _found_crash_ping = false; + let mut found_crash_report = false; + + for line in lines { + if line.trim().is_empty() { + continue; + } + + let payload: serde_json::Value = serde_json::from_str(line) + .context("parsing errors intake payload line") + .unwrap(); + + assert_errors_intake_payload(&payload, crash_typ); + + // Check which type this is + let ddtags = payload["ddtags"].as_str().unwrap(); + if ddtags.contains("is_crash_ping:true") { + _found_crash_ping = true; + } else { + found_crash_report = true; + } + } + + // In dual upload mode, we expect at least the crash report + // Crash ping might not always be sent (e.g., file endpoints skip it) + assert!( + found_crash_report, + "Should have found crash report in errors intake" + ); + } + + // Also validate telemetry still works (dual upload) + let crash_telemetry = fs::read(&fixtures.crash_telemetry_path) + .context("reading crashtracker telemetry payload") + .unwrap(); + assert_telemetry_message(&crash_telemetry, crash_typ); +} + +fn assert_errors_intake_payload(payload: &Value, crash_typ: &str) { + // Validate basic structure + assert_eq!(payload["ddsource"], "crashtracker"); + assert!(payload["timestamp"].is_number()); + assert!(payload["ddtags"].is_string()); + + let ddtags = payload["ddtags"].as_str().unwrap(); + assert!(ddtags.contains("service:foo")); + assert!(ddtags.contains("uuid:")); + + let error = &payload["error"]; + assert_eq!(error["source_type"], "Crashtracking"); + assert!(error["type"].is_string()); // Note: "error_type" field is serialized as "type" + assert!(error["message"].is_string()); + + // Check if this is a crash ping or crash report + if ddtags.contains("is_crash_ping:true") { + assert_eq!(error["is_crash"], false); + assert!(error["stack"].is_null()); + } else { + assert_eq!(error["is_crash"], true); + } + + // Check signal-specific values + match crash_typ { + "null_deref" => { + assert_eq!(error["type"], "SIGSEGV"); + assert!(error["message"] + .as_str() + .unwrap() + .contains("Process terminated")); + assert!(error["message"].as_str().unwrap().contains("SIGSEGV")); + } + "kill_sigabrt" | "raise_sigabrt" => { + assert_eq!(error["type"], "SIGABRT"); + assert!(error["message"].as_str().unwrap().contains("SIGABRT")); + } + "kill_sigill" | "raise_sigill" => { + assert_eq!(error["type"], "SIGILL"); + assert!(error["message"].as_str().unwrap().contains("SIGILL")); + } + "kill_sigbus" | "raise_sigbus" => { + assert_eq!(error["type"], "SIGBUS"); + assert!(error["message"].as_str().unwrap().contains("SIGBUS")); + } + "kill_sigsegv" | "raise_sigsegv" => { + assert_eq!(error["type"], "SIGSEGV"); + assert!(error["message"].as_str().unwrap().contains("SIGSEGV")); + } + _ => panic!("Unexpected crash_typ: {crash_typ}"), + } +} + fn extend_path>(parent: &Path, path: T) -> PathBuf { let mut parent = parent.to_path_buf(); parent.push(path); diff --git a/datadog-crashtracker/src/crash_info/errors_intake.rs b/datadog-crashtracker/src/crash_info/errors_intake.rs new file mode 100644 index 000000000..df25ee9e8 --- /dev/null +++ b/datadog-crashtracker/src/crash_info/errors_intake.rs @@ -0,0 +1,658 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::time::SystemTime; + +use crate::SigInfo; + +use super::{build_crash_ping_message, CrashInfo, Metadata, StackTrace}; +use anyhow::Context; +use chrono::{DateTime, Utc}; +use ddcommon::{config::parse_env, parse_uri, Endpoint}; +use http::{uri::PathAndQuery, Uri}; +use serde::Serialize; +use std::{borrow::Cow, time::Duration}; + +pub const DEFAULT_DD_SITE: &str = "datad0g.com"; +pub const PROD_ERRORS_INTAKE_SUBDOMAIN: &str = "event-platform-intake"; + +const DIRECT_ERRORS_INTAKE_URL_PATH: &str = "/api/v2/errorsintake"; +const AGENT_ERRORS_INTAKE_URL_PATH: &str = "/evp_proxy/v4/api/v2/errorsintake"; + +const DEFAULT_AGENT_HOST: &str = "localhost"; +const DEFAULT_AGENT_PORT: u16 = 8126; + +#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] +pub struct ErrorsIntakeConfig { + /// Endpoint to send the data to + /// This is private and should be interacted with through the set_endpoint function + /// to ensure the url path is properly set + pub(crate) endpoint: Option, + pub direct_submission_enabled: bool, + pub debug_enabled: bool, +} + +fn endpoint_with_errors_intake_path( + mut endpoint: Endpoint, + direct_submission_enabled: bool, +) -> anyhow::Result { + let mut uri_parts = endpoint.url.into_parts(); + if uri_parts + .scheme + .as_ref() + .is_some_and(|scheme| scheme.as_str() != "file") + { + uri_parts.path_and_query = Some(PathAndQuery::from_static( + if endpoint.api_key.is_some() && direct_submission_enabled { + DIRECT_ERRORS_INTAKE_URL_PATH + } else { + AGENT_ERRORS_INTAKE_URL_PATH + }, + )); + } + + endpoint.url = Uri::from_parts(uri_parts)?; + Ok(endpoint) +} + +/// Settings gathers configuration options we receive from the environment +#[derive(Debug, Default)] +pub struct ErrorsIntakeSettings { + // Env parameter + pub agent_host: Option, + pub trace_agent_port: Option, + pub trace_agent_url: Option, + pub trace_pipe_name: Option, + pub direct_submission_enabled: bool, + pub api_key: Option, + pub site: Option, + pub errors_intake_dd_url: Option, + pub shared_lib_debug: bool, + + // Filesystem check + pub agent_uds_socket_found: bool, +} + +impl ErrorsIntakeSettings { + // Agent connection configuration + const DD_TRACE_AGENT_URL: &'static str = "DD_TRACE_AGENT_URL"; + const DD_AGENT_HOST: &'static str = "DD_AGENT_HOST"; + const DD_TRACE_AGENT_PORT: &'static str = "DD_TRACE_AGENT_PORT"; + const DD_TRACE_PIPE_NAME: &'static str = "DD_TRACE_PIPE_NAME"; + + // Direct submission configuration + const _DD_DIRECT_SUBMISSION_ENABLED: &'static str = "_DD_DIRECT_SUBMISSION_ENABLED"; + const DD_API_KEY: &'static str = "DD_API_KEY"; + const DD_SITE: &'static str = "DD_SITE"; + const DD_ERRORS_INTAKE_DD_URL: &'static str = "DD_ERRORS_INTAKE_DD_URL"; + + // Debug configuration + const _DD_SHARED_LIB_DEBUG: &'static str = "_DD_SHARED_LIB_DEBUG"; + + pub fn from_env() -> Self { + let default = Self::default(); + Self { + agent_host: parse_env::str_not_empty(Self::DD_AGENT_HOST), + trace_agent_port: parse_env::int(Self::DD_TRACE_AGENT_PORT), + trace_agent_url: parse_env::str_not_empty(Self::DD_TRACE_AGENT_URL) + .or(default.trace_agent_url), + trace_pipe_name: parse_env::str_not_empty(Self::DD_TRACE_PIPE_NAME) + .or(default.trace_pipe_name), + direct_submission_enabled: parse_env::bool(Self::_DD_DIRECT_SUBMISSION_ENABLED) + .unwrap_or(default.direct_submission_enabled), + api_key: parse_env::str_not_empty(Self::DD_API_KEY), + site: parse_env::str_not_empty(Self::DD_SITE), + errors_intake_dd_url: parse_env::str_not_empty(Self::DD_ERRORS_INTAKE_DD_URL), + shared_lib_debug: parse_env::bool(Self::_DD_SHARED_LIB_DEBUG).unwrap_or(false), + + agent_uds_socket_found: (|| { + #[cfg(unix)] + return std::fs::metadata("/var/run/datadog/apm.socket").is_ok(); + #[cfg(not(unix))] + return false; + })(), + } + } +} + +impl ErrorsIntakeConfig { + // Implemented following same pattern as telemetry + fn trace_agent_url_from_setting(settings: &ErrorsIntakeSettings) -> String { + None.or_else(|| { + settings + .trace_agent_url + .as_deref() + .filter(|u| { + u.starts_with("unix://") + || u.starts_with("http://") + || u.starts_with("https://") + }) + .map(ToString::to_string) + }) + .or_else(|| { + #[cfg(windows)] + return settings + .trace_pipe_name + .as_ref() + .map(|pipe_name| format!("windows:{pipe_name}")); + #[cfg(not(windows))] + return None; + }) + .or_else(|| { + #[cfg(unix)] + return settings + .agent_uds_socket_found + .then(|| "unix:///var/run/datadog/apm.socket".to_string()); + #[cfg(not(unix))] + return None; + }) + .or_else(|| match (&settings.agent_host, settings.trace_agent_port) { + (None, None) => None, + _ => Some(format!( + "http://{}:{}", + settings.agent_host.as_deref().unwrap_or(DEFAULT_AGENT_HOST), + settings.trace_agent_port.unwrap_or(DEFAULT_AGENT_PORT), + )), + }) + .unwrap_or_else(|| format!("http://{DEFAULT_AGENT_HOST}:{DEFAULT_AGENT_PORT}")) + } + + fn api_key_from_settings(settings: &ErrorsIntakeSettings) -> Option> { + if !settings.direct_submission_enabled { + return None; + } + settings.api_key.clone().map(Cow::Owned) + } + + pub fn endpoint(&self) -> Option<&Endpoint> { + self.endpoint.as_ref() + } + + pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> { + self.endpoint = Some(endpoint_with_errors_intake_path( + endpoint, + self.direct_submission_enabled, + )?); + Ok(()) + } + + pub fn from_settings(settings: &ErrorsIntakeSettings) -> Self { + let api_key = Self::api_key_from_settings(settings); + + let mut this = Self { + endpoint: None, + direct_submission_enabled: settings.direct_submission_enabled, + debug_enabled: settings.shared_lib_debug, + }; + + // For direct submission, construct the proper intake URL + let url = if settings.direct_submission_enabled && settings.api_key.is_some() { + // Check for explicit errors intake URL first + if let Some(ref errors_intake_url) = settings.errors_intake_dd_url { + errors_intake_url.clone() + } else { + // Build direct submission URL using site configuration + let site = settings.site.as_deref().unwrap_or(DEFAULT_DD_SITE); + format!("https://{}.{}", PROD_ERRORS_INTAKE_SUBDOMAIN, site) + } + } else { + // For agent proxy, use existing logic + Self::trace_agent_url_from_setting(settings) + }; + + if let Ok(parsed_url) = parse_uri(&url) { + let _res = this.set_endpoint(Endpoint { + url: parsed_url, + api_key, + ..Default::default() + }); + } + + this + } + + /// Get the configuration of the errors intake from env variables + pub fn from_env() -> Self { + let settings = ErrorsIntakeSettings::from_env(); + Self::from_settings(&settings) + } + + /// set_host sets the host errors intake should connect to. + pub fn set_host_from_url(&mut self, host_url: &str) -> anyhow::Result<()> { + let endpoint = self.endpoint.take().unwrap_or_default(); + + self.set_endpoint(Endpoint { + url: parse_uri(host_url)?, + ..endpoint + }) + } +} + +#[derive(Serialize, Debug)] +pub struct ErrorObject { + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub error_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stack: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub is_crash: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub fingerprint: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub source_type: Option, +} + +#[derive(Serialize, Debug)] +pub struct ErrorsIntakePayload { + pub timestamp: u64, + pub ddsource: String, + pub ddtags: String, + pub error: ErrorObject, + #[serde(skip_serializing_if = "Option::is_none")] + pub trace_id: Option, +} + +impl ErrorsIntakePayload { + pub fn from_crash_info(crash_info: &CrashInfo) -> anyhow::Result { + let timestamp = crash_info.timestamp.parse::>().map_or_else( + |_| { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) + }, + |ts| ts.timestamp_millis() as u64, + ); + + // Extract service information from metadata tags + let mut service_name = "unknown".to_string(); + let mut env = None; + let mut service_version = None; + + for tag in &crash_info.metadata.tags { + if let Some((key, value)) = tag.split_once(':') { + match key { + "service" => service_name = value.to_string(), + "env" => env = Some(value.to_string()), + "version" => service_version = Some(value.to_string()), + _ => {} + } + } + } + + // Build ddtags + let mut ddtags = format!("service:{}", service_name); + if let Some(env) = env { + ddtags.push_str(&format!(",env:{env}")); + } + if let Some(version) = service_version { + ddtags.push_str(&format!(",version:{version}")); + } + ddtags.push_str(&format!(",uuid:{}", crash_info.uuid)); + ddtags.push_str(",is_crash:true"); + + // Extract error info from signal + let (error_type, error_message) = if let Some(sig_info) = &crash_info.sig_info { + ( + Some(format!("{:?}", sig_info.si_signo_human_readable)), + Some(format!( + "Process terminated with {:?} ({:?})", + sig_info.si_code_human_readable, sig_info.si_signo_human_readable + )), + ) + } else { + ( + Some("Unknown".to_string()), + crash_info.error.message.clone(), + ) + }; + + // Use crash stack if available + let error_stack = if !crash_info.error.stack.frames.is_empty() { + Some(crash_info.error.stack.clone()) + } else { + None + }; + + Ok(Self { + timestamp, + ddsource: "crashtracker".to_string(), + ddtags, + error: ErrorObject { + error_type, + message: error_message, + stack: error_stack, + is_crash: Some(true), + fingerprint: crash_info.fingerprint.clone(), + source_type: Some("Crashtracking".to_string()), + }, + trace_id: None, + }) + } + + pub fn from_crash_ping( + crash_uuid: &str, + sig_info: &SigInfo, + metadata: &Metadata, + ) -> anyhow::Result { + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + + // Extract service info from metadata tags + let mut service_name = "unknown".to_string(); + let mut env = None; + let mut service_version = None; + + for tag in &metadata.tags { + if let Some((key, value)) = tag.split_once(':') { + match key { + "service" => service_name = value.to_string(), + "env" => env = Some(value.to_string()), + "version" => service_version = Some(value.to_string()), + _ => {} + } + } + } + + // Build ddtags + let mut ddtags = format!("service:{}", service_name); + if let Some(env) = env { + ddtags.push_str(&format!(",env:{env}")); + } + if let Some(version) = service_version { + ddtags.push_str(&format!(",version:{version}")); + } + ddtags.push_str(&format!(",uuid:{crash_uuid}")); + ddtags.push_str(",is_crash_ping:true"); + + Ok(Self { + timestamp, + ddsource: "crashtracker".to_string(), + ddtags, + error: ErrorObject { + error_type: Some(format!("{:?}", sig_info.si_signo_human_readable)), + message: Some(build_crash_ping_message(sig_info)), + stack: None, + is_crash: Some(false), + fingerprint: None, + source_type: Some("Crashtracking".to_string()), + }, + trace_id: None, + }) + } +} + +pub struct ErrorsIntakeUploader { + cfg: ErrorsIntakeConfig, +} + +impl ErrorsIntakeUploader { + pub fn new( + _crashtracker_metadata: &Metadata, + endpoint: &Option, + ) -> anyhow::Result { + let mut cfg = ErrorsIntakeConfig::from_env(); + + if let Some(endpoint) = endpoint { + cfg.set_endpoint(endpoint.clone())?; + } + Ok(Self { cfg }) + } + + pub async fn send_crash_ping( + &self, + crash_uuid: &str, + sig_info: &SigInfo, + metadata: &Metadata, + ) -> anyhow::Result<()> { + let payload = ErrorsIntakePayload::from_crash_ping(crash_uuid, sig_info, metadata)?; + self.send_payload(&payload).await + } + + pub async fn upload_to_errors_intake(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { + let payload = ErrorsIntakePayload::from_crash_info(crash_info)?; + self.send_payload(&payload).await + } + + async fn send_payload(&self, payload: &ErrorsIntakePayload) -> anyhow::Result<()> { + let Some(endpoint) = self.cfg.endpoint() else { + return Ok(()); + }; + + // Handle file endpoint for testing + if endpoint.url.scheme_str() == Some("file") { + let path = ddcommon::decode_uri_path_in_authority(&endpoint.url) + .context("errors intake file path is not valid")?; + + let file_path = path.with_extension("errors"); + let file = std::fs::File::create(&file_path).with_context(|| { + format!( + "Failed to create errors intake file {}", + file_path.display() + ) + })?; + + serde_json::to_writer_pretty(file, payload).with_context(|| { + format!( + "Failed to write errors intake JSON to {}", + file_path.display() + ) + })?; + + return Ok(()); + } + + // Build HTTP request using the same pattern as telemetry + let mut req_builder = + endpoint.to_request_builder(concat!("crashtracker/", env!("CARGO_PKG_VERSION")))?; + + // Add errors intake specific headers + if endpoint.api_key.is_some() { + // Direct intake - DD-API-KEY is added by to_request_builder + } else { + // Agent proxy - add EvP subdomain header + req_builder = + req_builder.header("X-Datadog-EVP-Subdomain", PROD_ERRORS_INTAKE_SUBDOMAIN); + } + + let req = req_builder + .method(http::Method::POST) + .header( + http::header::CONTENT_TYPE, + ddcommon::header::APPLICATION_JSON, + ) + .body(serde_json::to_string(payload)?.into())?; + + // Create HTTP client and send request + let client = ddcommon::hyper_migration::new_client_periodic(); + + tokio::time::timeout( + Duration::from_millis(endpoint.timeout_ms), + client.request(req), + ) + .await??; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::crash_info::test_utils::TestInstance; + use std::sync::Mutex; + + // Mutex to ensure environment variable tests run sequentially + static ENV_TEST_LOCK: Mutex<()> = Mutex::new(()); + + #[test] + fn test_errors_payload_from_crash_info() { + let crash_info = CrashInfo::test_instance(1); + let payload = ErrorsIntakePayload::from_crash_info(&crash_info).unwrap(); + + assert_eq!(payload.ddsource, "crashtracker"); + assert!(payload.ddtags.contains("service:foo")); + assert!(payload.ddtags.contains("uuid:")); + assert!(payload.ddtags.contains("is_crash:true")); + assert_eq!(payload.error.source_type, Some("Crashtracking".to_string())); + assert_eq!(payload.error.is_crash, Some(true)); + } + + #[test] + fn test_errors_payload_from_crash_ping() { + let metadata = Metadata::test_instance(1); + let sig_info = crate::SigInfo::test_instance(42); + let crash_uuid = "test-uuid-123"; + + let payload = + ErrorsIntakePayload::from_crash_ping(crash_uuid, &sig_info, &metadata).unwrap(); + + assert_eq!(payload.ddsource, "crashtracker"); + assert!(payload.ddtags.contains("service:foo")); + assert!(payload.ddtags.contains("uuid:test-uuid-123")); + assert!(payload.ddtags.contains("is_crash_ping:true")); + assert_eq!(payload.error.source_type, Some("Crashtracking".to_string())); + assert_eq!(payload.error.is_crash, Some(false)); + assert!(payload.error.stack.is_none()); + } + + #[test] + fn test_errors_intake_config_from_env() { + let _lock = ENV_TEST_LOCK.lock().unwrap(); + + // Clear all environment variables first to isolate test + std::env::remove_var("DD_TRACE_AGENT_URL"); + std::env::remove_var("DD_AGENT_HOST"); + std::env::remove_var("DD_TRACE_AGENT_PORT"); + std::env::remove_var("DD_API_KEY"); + std::env::remove_var("_DD_DIRECT_SUBMISSION_ENABLED"); + std::env::remove_var("DD_SITE"); + + // Test direct submission configuration + std::env::set_var("DD_API_KEY", "test-key"); + std::env::set_var("_DD_DIRECT_SUBMISSION_ENABLED", "true"); + + let cfg = ErrorsIntakeConfig::from_env(); + let endpoint = cfg.endpoint().unwrap(); + + // Should use event-platform-intake.datad0g.com for direct submission + assert_eq!( + endpoint.url.host(), + Some("event-platform-intake.datad0g.com") + ); + assert_eq!(endpoint.url.scheme_str(), Some("https")); + assert!(endpoint.api_key.is_some()); + + // With direct submission enabled and API key, should use direct path + assert_eq!(endpoint.url.path(), DIRECT_ERRORS_INTAKE_URL_PATH); + + std::env::remove_var("DD_API_KEY"); + std::env::remove_var("_DD_DIRECT_SUBMISSION_ENABLED"); + } + + #[test] + fn test_errors_intake_config_custom_site() { + let _lock = ENV_TEST_LOCK.lock().unwrap(); + + std::env::remove_var("DD_TRACE_AGENT_URL"); + std::env::remove_var("DD_AGENT_HOST"); + std::env::remove_var("DD_TRACE_AGENT_PORT"); + std::env::remove_var("DD_API_KEY"); + std::env::remove_var("_DD_DIRECT_SUBMISSION_ENABLED"); + std::env::remove_var("DD_SITE"); + + // Test direct submission with custom site + std::env::set_var("DD_API_KEY", "test-key"); + std::env::set_var("_DD_DIRECT_SUBMISSION_ENABLED", "true"); + std::env::set_var("DD_SITE", "us3.datadoghq.com"); + + let cfg = ErrorsIntakeConfig::from_env(); + let endpoint = cfg.endpoint().unwrap(); + + // Should use event-platform-intake with custom site + assert_eq!( + endpoint.url.host(), + Some("event-platform-intake.us3.datadoghq.com") + ); + assert_eq!(endpoint.url.scheme_str(), Some("https")); + assert!(endpoint.api_key.is_some()); + assert_eq!(endpoint.url.path(), DIRECT_ERRORS_INTAKE_URL_PATH); + + std::env::remove_var("DD_API_KEY"); + std::env::remove_var("_DD_DIRECT_SUBMISSION_ENABLED"); + std::env::remove_var("DD_SITE"); + } + + #[test] + fn test_errors_intake_config_agent_proxy() { + let _lock = ENV_TEST_LOCK.lock().unwrap(); + + // Clear all environment variables first to isolate test + std::env::remove_var("DD_TRACE_AGENT_URL"); + std::env::remove_var("DD_AGENT_HOST"); + std::env::remove_var("DD_TRACE_AGENT_PORT"); + std::env::remove_var("DD_API_KEY"); + std::env::remove_var("_DD_DIRECT_SUBMISSION_ENABLED"); + std::env::remove_var("DD_SITE"); + + // Test agent proxy configuration (no API key or direct submission disabled) + std::env::set_var("DD_TRACE_AGENT_URL", "http://localhost:9126"); + + let cfg = ErrorsIntakeConfig::from_env(); + let endpoint = cfg.endpoint().unwrap(); + + assert_eq!(endpoint.url.host(), Some("localhost")); + assert_eq!(endpoint.url.port_u16(), Some(9126)); + + // Should use agent proxy path + assert_eq!(endpoint.url.path(), AGENT_ERRORS_INTAKE_URL_PATH); + + // Clean up test environment + std::env::remove_var("DD_TRACE_AGENT_URL"); + std::env::remove_var("DD_AGENT_HOST"); + std::env::remove_var("DD_TRACE_AGENT_PORT"); + std::env::remove_var("DD_API_KEY"); + std::env::remove_var("_DD_DIRECT_SUBMISSION_ENABLED"); + std::env::remove_var("DD_SITE"); + } + + #[test] + fn test_errors_intake_config_agent_with_api_key_but_no_direct() { + let _lock = ENV_TEST_LOCK.lock().unwrap(); + + // Clear all environment variables first to isolate test + std::env::remove_var("DD_TRACE_AGENT_URL"); + std::env::remove_var("DD_AGENT_HOST"); + std::env::remove_var("DD_TRACE_AGENT_PORT"); + std::env::remove_var("DD_API_KEY"); + std::env::remove_var("_DD_DIRECT_SUBMISSION_ENABLED"); + std::env::remove_var("DD_SITE"); + + // Test: API key is set but direct submission is NOT enabled + // Should still use agent proxy + std::env::set_var("DD_TRACE_AGENT_URL", "http://localhost:9126"); + std::env::set_var("DD_API_KEY", "test-key"); + // Note: _DD_DIRECT_SUBMISSION_ENABLED is NOT set (defaults to false) + + let cfg = ErrorsIntakeConfig::from_env(); + let endpoint = cfg.endpoint().unwrap(); + + // Should use agent URL, not direct submission + assert_eq!(endpoint.url.host(), Some("localhost")); + assert_eq!(endpoint.url.port_u16(), Some(9126)); + + // Should use agent proxy path, not direct path + assert_eq!(endpoint.url.path(), AGENT_ERRORS_INTAKE_URL_PATH); + + // Should have no API key in endpoint since we're using agent proxy + assert!(endpoint.api_key.is_none()); + + // Clean up test environment + std::env::remove_var("DD_TRACE_AGENT_URL"); + std::env::remove_var("DD_API_KEY"); + } +} diff --git a/datadog-crashtracker/src/crash_info/mod.rs b/datadog-crashtracker/src/crash_info/mod.rs index 9c6e1d548..e858caef4 100644 --- a/datadog-crashtracker/src/crash_info/mod.rs +++ b/datadog-crashtracker/src/crash_info/mod.rs @@ -3,6 +3,7 @@ mod builder; mod error_data; +mod errors_intake; mod experimental; mod metadata; mod os_info; @@ -17,6 +18,7 @@ mod unknown_value; pub use builder::*; use ddcommon::Endpoint; pub use error_data::*; +pub use errors_intake::*; pub use experimental::*; pub use metadata::Metadata; pub use os_info::*; @@ -31,6 +33,15 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, fs::File, path::Path}; +/// Helper function to create standardized crash ping message format +/// This is shared between telemetry and errors intake +pub fn build_crash_ping_message(sig_info: &SigInfo) -> String { + format!( + "Crashtracker crash ping: crash processing started - Process terminated with {:?} ({:?})", + sig_info.si_code_human_readable, sig_info.si_signo_human_readable + ) +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct CrashInfo { #[serde(default, skip_serializing_if = "HashMap::is_empty")] diff --git a/datadog-crashtracker/src/crash_info/telemetry.rs b/datadog-crashtracker/src/crash_info/telemetry.rs index 9663471e1..fa6a9a397 100644 --- a/datadog-crashtracker/src/crash_info/telemetry.rs +++ b/datadog-crashtracker/src/crash_info/telemetry.rs @@ -4,8 +4,8 @@ use std::{fmt::Write, time::SystemTime}; use crate::SigInfo; -use super::{CrashInfo, Metadata}; -use anyhow::{Context, Ok}; +use super::{build_crash_ping_message, CrashInfo, ErrorsIntakeUploader, Metadata}; +use anyhow::Context; use chrono::{DateTime, Utc}; use ddcommon::Endpoint; use ddtelemetry::{ @@ -68,9 +68,37 @@ macro_rules! parse_tags { pub struct TelemetryCrashUploader { metadata: TelemetryMetadata, cfg: ddtelemetry::config::Config, + errors_intake_uploader: Option, } impl TelemetryCrashUploader { + // build_crash_ping_message is now imported from the parent module + + /// Helper function to convert telemetry metadata to crashtracker metadata + /// This is used for dual uploads to errors intake + fn telemetry_metadata_to_crashtracker_metadata(&self) -> Metadata { + let metadata = &self.metadata; + let mut tags = vec![ + format!("service:{}", metadata.application.service_name), + format!("language:{}", metadata.application.language_name), + format!("language_version:{}", metadata.application.language_version), + format!("profiler_version:{}", metadata.application.tracer_version), + ]; + + if let Some(env) = &metadata.application.env { + tags.push(format!("env:{}", env)); + } + if let Some(version) = &metadata.application.service_version { + tags.push(format!("version:{}", version)); + } + + Metadata { + library_name: metadata.application.language_name.clone(), + library_version: metadata.application.tracer_version.clone(), + family: "crashtracker".to_string(), + tags, + } + } pub fn new( crashtracker_metadata: &Metadata, endpoint: &Option, @@ -120,6 +148,15 @@ impl TelemetryCrashUploader { let host = build_host(); + let errors_intake_uploader = + match ErrorsIntakeUploader::new(crashtracker_metadata, endpoint) { + Ok(uploader) => Some(uploader), + Err(e) => { + eprintln!("Failed to create errors intake uploader: {e}"); + None + } + }; + let s = Self { metadata: TelemetryMetadata { host, @@ -127,6 +164,7 @@ impl TelemetryCrashUploader { runtime_id: runtime_id.unwrap_or("unknown").to_owned(), }, cfg, + errors_intake_uploader, }; Ok(s) } @@ -175,10 +213,7 @@ impl TelemetryCrashUploader { let crash_ping_msg = CrashPingMessage::new( crash_uuid.to_string(), - format!( - "Crashtracker crash ping: crash processing started - Process terminated with {:?} ({:?})", - sig_info.si_code_human_readable, sig_info.si_signo_human_readable - ), + build_crash_ping_message(sig_info), sig_info.clone(), ); @@ -201,7 +236,22 @@ impl TelemetryCrashUploader { origin: Some("Crashtracker"), }; - self.send_telemetry_payload(&payload).await + // Send to both telemetry and errors intake + let telemetry_result = self.send_telemetry_payload(&payload).await; + + if let Some(errors_uploader) = &self.errors_intake_uploader { + let crash_metadata = self.telemetry_metadata_to_crashtracker_metadata(); + let errors_intake_result = errors_uploader + .send_crash_ping(crash_uuid, sig_info, &crash_metadata) + .await; + if let Err(e) = errors_intake_result { + eprintln!("Failed to send crash report to errors intake: {e}"); + } + } else { + eprintln!("No errors intake uploader available for crash report"); + } + + telemetry_result } pub async fn upload_to_telemetry(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { @@ -241,7 +291,18 @@ impl TelemetryCrashUploader { origin: Some("Crashtracker"), }; - self.send_telemetry_payload(&payload).await + // Send to both telemetry and errors intake + let telemetry_result = self.send_telemetry_payload(&payload).await; + + if let Some(errors_uploader) = &self.errors_intake_uploader { + let errors_intake_result = errors_uploader.upload_to_errors_intake(crash_info).await; + if let Err(e) = errors_intake_result { + eprintln!("Failed to send crash report to errors intake: {e}"); + } + } else { + eprintln!("No errors intake uploader available for crash report"); + } + telemetry_result } async fn send_telemetry_payload(&self, payload: &data::Telemetry<'_>) -> anyhow::Result<()> {