diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index c79a76c8c..1ce57db4c 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -78,6 +78,7 @@ pub struct Config { pub capture_lambda_payload_max_depth: u32, #[serde(deserialize_with = "deserialize_service_mapping")] pub service_mapping: HashMap, + pub serverless_logs_enabled: bool, // Trace Propagation #[serde(deserialize_with = "deserialize_trace_propagation_style")] pub trace_propagation_style: Vec, @@ -110,6 +111,7 @@ impl Default for Config { capture_lambda_payload: false, capture_lambda_payload_max_depth: 10, service_mapping: HashMap::new(), + serverless_logs_enabled: true, // Trace Propagation trace_propagation_style: vec![ TracePropagationStyle::Datadog, diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index a7eeb561f..d069c0e6e 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -31,6 +31,8 @@ pub struct LambdaProcessor { ready_logs: Vec, // Main event bus event_bus: Sender, + // Logs enabled + logs_enabled: bool, } const OOM_ERRORS: [&str; 7] = [ @@ -63,12 +65,14 @@ impl LambdaProcessor { let function_arn = tags_provider.get_canonical_id().unwrap_or_default(); let processing_rules = &datadog_config.logs_config_processing_rules; + let logs_enabled = datadog_config.serverless_logs_enabled; let rules = LambdaProcessor::compile_rules(processing_rules); LambdaProcessor { function_arn, service, tags, rules, + logs_enabled, invocation_context: InvocationContext::new(String::new(), 0.0, 0.0, 0, None), orphan_logs: Vec::new(), ready_logs: Vec::new(), @@ -250,8 +254,8 @@ impl LambdaProcessor { pub async fn process(&mut self, event: TelemetryEvent, aggregator: &Arc>) { if let Ok(mut log) = self.make_log(event).await { - let should_send_log = - LambdaProcessor::apply_rules(&self.rules, &mut log.message.message); + let should_send_log = self.logs_enabled + && LambdaProcessor::apply_rules(&self.rules, &mut log.message.message); if should_send_log { if let Ok(serialized_log) = serde_json::to_string(&log) { // explicitly drop log so we don't accidentally re-use it and push @@ -669,6 +673,42 @@ mod tests { assert_eq!(batch, serialized_log.as_bytes()); } + #[tokio::test] + async fn test_process_logs_disabled() { + let aggregator = Arc::new(Mutex::new(Aggregator::default())); + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: Some("test:tags".to_string()), + serverless_logs_enabled: false, + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + + let mut processor = + LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + + processor.process(event.clone(), &aggregator).await; + + let mut aggregator_lock = aggregator.lock().unwrap(); + let batch = aggregator_lock.get_batch(); + assert_eq!(batch.len(), 0); + } + #[tokio::test] async fn test_process_log_with_no_request_id() { let aggregator = Arc::new(Mutex::new(Aggregator::default()));