diff --git a/bottlecap/README.md b/bottlecap/README.md index 6629fcf09..27ed4a2dc 100644 --- a/bottlecap/README.md +++ b/bottlecap/README.md @@ -9,7 +9,7 @@ One time setup: Then: `./runBottlecap.sh` ## Developing using Codespaces -Step 1: Create a codespace (code > codespaces > create codespace on main) +Step 1: Create a codespace (code > codespaces > create codespace on main) ![img](./codespace.png) @@ -18,3 +18,158 @@ Step 2: Hack in the `bottlecap` folder Step 3: Test your change running `./runBottlecap.sh` ![img](./runBottlecap.png) + +## Flush Strategies + +Bottlecap supports several flush strategies that control when and how observability data (metrics, logs, traces) is sent to Datadog. The strategy is configured via the `DD_SERVERLESS_FLUSH_STRATEGY` environment variable. + +**Important**: Flush strategies behave differently depending on the Lambda execution mode: +- **Managed Instance**: Uses continuous background flushing (flush strategies are ignored) +- **On-Demand**: Uses configurable flush strategies + +### Managed Instance Mode vs On-Demand Mode + +#### Managed Instance Mode +Lambda Managed Instances run your functions on EC2 instances (managed by AWS) with multi-concurrent invocations. This requires setting up a **capacity provider** - a configuration that defines VPC settings, instance requirements, and scaling parameters for the managed instances. + +- **Activation**: Detected automatically via the `AWS_LAMBDA_INITIALIZATION_TYPE` environment variable. When this equals `"lambda-managed-instances"`, Bottlecap enters Managed Instance mode +- **Flush Behavior**: + - A dedicated background task continuously flushes data at regular intervals (default: 30 seconds) + - All flushes are **non-blocking** and run concurrently with invocation processing + - Prevents resource buildup by skipping a flush cycle if the previous flush is still in progress + - `DD_SERVERLESS_FLUSH_STRATEGY` is **ignored** in this mode +- **Shutdown Behavior**: + - Background flusher waits for pending flushes to complete before shutdown + - Final flush ensures all remaining data is sent before the execution environment terminates +- **Execution Model**: Multi-concurrent invocations where one execution environment handles multiple invocations simultaneously (unlike traditional Lambda's one-invocation-per-environment model) +- **Use case**: Steady-state, high-volume workloads where optimizing costs with predictable capacity is desired +- **Key advantage**: Zero flush overhead per invocation - flushing happens independently in the background +- **Infrastructure**: Lambda launches 3 instances by default for availability zone resiliency when a function version is published to a capacity provider + +#### On-Demand Mode (Traditional Mode) +- **Activation**: Default mode for standard Lambda execution (one invocation at a time) +- **Flush Behavior**: + - Respects the configured `DD_SERVERLESS_FLUSH_STRATEGY` + - Flush timing is tied to invocation lifecycle events + - Can be blocking or non-blocking depending on the chosen strategy +- **Use case**: Standard Lambda functions with sequential invocation processing +- **Key advantage**: Fine-grained control over flush timing and behavior + +### Available Strategies (On-Demand Mode Only) + +#### `Default` (Recommended) +- **Configuration**: Set automatically when no strategy is specified, or explicitly via `DD_SERVERLESS_FLUSH_STRATEGY=default` +- **Behavior**: Adaptive - changes based on invocation frequency + - **Initial behavior** (first ~20 invocations): Flushes at end of each invocation (blocking) + - **After 20 invocations**: Switches to non-blocking continuous flushes +- **Interval**: 60 seconds +- **Use case**: Recommended for most serverless workloads - automatically optimizes for your traffic pattern + +#### `End` +- **Configuration**: `DD_SERVERLESS_FLUSH_STRATEGY=end` +- **Behavior**: Always flushes at the end of each invocation (blocking) +- **Interval**: 15 minutes (effectively disables periodic flushing) +- **Use case**: Minimize flushing overhead - only flush once per invocation when the invocation is complete + +#### `EndPeriodically` +- **Configuration**: `DD_SERVERLESS_FLUSH_STRATEGY=end,` (e.g., `end,1000`) +- **Behavior**: Flushes both at the end of invocation AND periodically during long-running invocations (blocking) +- **Interval**: User-specified (in milliseconds) +- **Use case**: Long-running Lambda functions where you want data visibility during execution, not just at the end + +#### `Periodically` +- **Configuration**: `DD_SERVERLESS_FLUSH_STRATEGY=periodically,` (e.g., `periodically,60000`) +- **Behavior**: Always flushes at the specified interval (blocking) +- **Interval**: User-specified (in milliseconds) +- **Use case**: Predictable periodic flushing when you want guaranteed flush timing + +#### `Continuously` +- **Configuration**: `DD_SERVERLESS_FLUSH_STRATEGY=continuously,` (e.g., `continuously,60000`) +- **Behavior**: Spawns non-blocking async flush tasks at the specified interval +- **Interval**: User-specified (in milliseconds) +- **Use case**: High-throughput scenarios where invocation latency is critical and you can't afford to wait for flushes + +### Summary Table + +| Mode | Strategy | Blocking? | Adapts? | Best For | +|------|----------|-----------|---------|----------| +| **Managed Instance** | *Always Continuous* | ❌ No | ❌ No | Steady-state high-volume workloads with multi-concurrent invocations | +| **On-Demand** | Default | Initially yes, then no | ✅ Yes | General use - auto-optimizes | +| **On-Demand** | End | ✅ Yes | ❌ No | Minimal overhead, sporadic invocations | +| **On-Demand** | EndPeriodically | ✅ Yes | ❌ No | Long-running functions with progress visibility | +| **On-Demand** | Periodically | ✅ Yes | ❌ No | Predictable flush timing | +| **On-Demand** | Continuously | ❌ No | ❌ No | High-throughput, latency-sensitive | + +### Implementation Details + +#### Managed Instance Mode Implementation +Located in `bottlecap/src/bin/bottlecap/main.rs`: +- **Mode Detection** (`bottlecap/src/config/aws.rs`): + - Checks if `AWS_LAMBDA_INITIALIZATION_TYPE` environment variable equals `"lambda-managed-instances"` +- **Event Subscription** (`bottlecap/src/extension/mod.rs`): + - Only subscribes to `SHUTDOWN` events (not `INVOKE` events) + - On-Demand mode subscribes to both `INVOKE` and `SHUTDOWN` events +- **Flush Strategy Override**: + - Function: `get_flush_strategy_for_mode()` + - If user configures a non-continuous strategy, it's overridden to continuous with a warning + - Uses `DEFAULT_CONTINUOUS_FLUSH_INTERVAL` (30 seconds) from `flush_control.rs` +- **Main Event Loop**: + - Processes events from the event bus (telemetry events like `platform.start`, `platform.report`) + - Does NOT call `/next` endpoint for each invocation (only for shutdown) + - Uses `tokio::select!` with biased ordering to prioritize telemetry events over shutdown signals +- **Background Flusher Task**: + - Spawns at startup and runs until shutdown + - Uses `tokio::select!` to handle periodic flush ticks and shutdown signals + - Calls `PendingFlushHandles::spawn_non_blocking_flushes()` for each flush cycle + - Skips flush if previous flush handles are still pending +- **Non-Blocking Flush Spawning**: + - Method: `PendingFlushHandles::spawn_non_blocking_flushes()` + - Spawns separate async tasks for logs, traces, metrics, stats, and proxy flushes + - Each task runs independently without blocking the main event loop + - Failed payloads are tracked for retry in `await_flush_handles()` +- **Shutdown Handling**: + - Separate task waits for SHUTDOWN event from Extensions API + - Cancels background flusher and signals main event loop +- **Final Flush**: + - Function: `blocking_flush_all()` + - Ensures all remaining data is sent before termination + - Uses blocking flush with `force_flush_trace_stats=true` + +#### On-Demand Mode Implementation +Located in `bottlecap/src/bin/bottlecap/main.rs`: +- **Flush Control** (`bottlecap/src/lifecycle/flush_control.rs`): + - Function: `evaluate_flush_decision()` + - Evaluates flush strategy and invocation history + - Returns `FlushDecision` enum: `End`, `Periodic`, `Continuous`, or `Dont` + - Adaptive behavior: After ~20 invocations, Default strategy switches from End to Continuous +- **Event Loop**: Uses `FlushControl::evaluate_flush_decision()` to determine flush behavior + - `FlushDecision::End`: Waits for `platform.runtimeDone`, then performs blocking flush + - `FlushDecision::Periodic`: Performs blocking flush at configured interval + - `FlushDecision::Continuous`: Spawns non-blocking flush tasks (similar to Managed Instance) + - `FlushDecision::Dont`: Skips flushing for this cycle +- **Final Flush**: + - Function: `blocking_flush_all()` + - Blocking flush with `force_flush_trace_stats=true` + - Ensures all remaining data is sent before shutdown +- **Configuration** (`bottlecap/src/config/flush_strategy.rs`): + - Deserializes `DD_SERVERLESS_FLUSH_STRATEGY` environment variable + - Supports formats: `"end"`, `"end,"`, `"periodically,"`, `"continuously,"` + +### Key Architectural Differences + +| Aspect | Managed Instance Mode | On-Demand Mode | +|--------|----------------------|----------------| +| **Event Source** | Telemetry API (platform events) | Extensions API `/next` endpoint | +| **Invocation Model** | Multi-concurrent (one environment handles multiple invocations) | Single-concurrent (one invocation per environment) | +| **Scaling** | Asynchronous, CPU-based scaling | Reactive scaling with cold starts | +| **Pricing** | EC2 instance-based | Per-request duration-based | +| **Flush Trigger** | Background interval timer | Invocation lifecycle + interval | +| **Strategy Config** | Ignored (always continuous) | Configurable via env var | +| **Main Loop** | Event bus processing | `/next` + event bus processing | +| **Shutdown Detection** | Separate task monitors `/next` | Main loop receives from `/next` | + +## References + +### AWS Lambda Managed Instances Documentation +- [Introducing AWS Lambda Managed Instances: Serverless simplicity with EC2 flexibility](https://aws.amazon.com/blogs/aws/introducing-aws-lambda-managed-instances-serverless-simplicity-with-ec2-flexibility/) - AWS Blog announcement +- [Lambda Managed Instances - AWS Lambda Developer Guide](https://docs.aws.amazon.com/lambda/latest/dg/lambda-managed-instances.html) - Official AWS documentation diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 457951233..d9c968327 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -24,7 +24,7 @@ use bottlecap::{ config::{ self, Config, aws::{AwsConfig, build_lambda_function_arn}, - flush_strategy::FlushStrategy, + flush_strategy::{FlushStrategy, PeriodicStrategy}, log_level::LogLevel, }, event_bus::{Event, EventBus}, @@ -39,7 +39,7 @@ use bottlecap::{ }, fips::{log_fips_status, prepare_client_provider}, lifecycle::{ - flush_control::{FlushControl, FlushDecision}, + flush_control::{DEFAULT_CONTINUOUS_FLUSH_INTERVAL, FlushControl, FlushDecision}, invocation::processor_service::{InvocationProcessorHandle, InvocationProcessorService}, listener::Listener as LifecycleListener, }, @@ -99,7 +99,7 @@ use std::{collections::hash_map, env, path::Path, str::FromStr, sync::Arc}; use tokio::time::{Duration, Instant}; use tokio::{sync::Mutex as TokioMutex, sync::mpsc::Sender, task::JoinHandle}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use tracing_subscriber::EnvFilter; use ustr::Ustr; @@ -136,6 +136,70 @@ impl PendingFlushHandles { trace_pending || log_pending || metric_pending || proxy_pending } + #[allow(clippy::too_many_lines)] + /// Spawns non-blocking flush tasks for all flushers (logs, traces, metrics, stats, proxy). + async fn spawn_non_blocking_flushes( + &mut self, + logs_flusher: &LogsFlusher, + trace_flusher: &Arc, + metrics_flushers: &Arc>>, + stats_flusher: &Arc, + proxy_flusher: &Arc, + metrics_aggr_handle: &MetricsAggregatorHandle, + ) { + // Spawn logs flush + let lf = logs_flusher.clone(); + self.log_flush_handles + .push(tokio::spawn(async move { lf.flush(None).await })); + + // Spawn traces flush + let tf = trace_flusher.clone(); + self.trace_flush_handles.push(tokio::spawn(async move { + tf.flush(None).await.unwrap_or_default() + })); + + // Spawn metrics flush + let (metrics_flushers_copy, series, sketches) = { + let locked_metrics = metrics_flushers.lock().await; + let flush_response = metrics_aggr_handle + .clone() + .flush() + .await + .expect("can't flush metrics handle"); + ( + locked_metrics.clone(), + flush_response.series, + flush_response.distributions, + ) + }; + + for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { + let series_clone = series.clone(); + let sketches_clone = sketches.clone(); + let handle = tokio::spawn(async move { + let (retry_series, retry_sketches) = flusher + .flush_metrics(series_clone, sketches_clone) + .await + .unwrap_or_default(); + MetricsRetryBatch { + flusher_id: idx, + series: retry_series, + sketches: retry_sketches, + } + }); + self.metric_flush_handles.push(handle); + } + + // Stats flush (fire and forget, not tracked) + let () = stats_flusher.flush(false).await; + + // Spawn proxy flush + let pf = proxy_flusher.clone(); + self.proxy_flush_handles.push(tokio::spawn(async move { + pf.flush(None).await.unwrap_or_default() + })); + } + #[allow(clippy::too_many_lines)] async fn await_flush_handles( &mut self, @@ -283,8 +347,15 @@ async fn main() -> anyhow::Result<()> { let cloned_client = client.clone(); let runtime_api = aws_config.runtime_api.clone(); + let managed_instance_mode = aws_config.is_managed_instance_mode(); let response = tokio::task::spawn(async move { - extension::register(&cloned_client, &runtime_api, extension::EXTENSION_NAME).await + extension::register( + &cloned_client, + &runtime_api, + extension::EXTENSION_NAME, + managed_instance_mode, + ) + .await }); // First load the AWS configuration let lambda_directory: String = @@ -357,6 +428,37 @@ fn enable_logging_subsystem() { debug!("Logging subsystem enabled"); } +/// Returns the appropriate flush strategy for the given mode. +/// In managed instance mode, continuous flush strategy is required for optimal performance. +/// If a different strategy is configured, this function will override it and log an info message. +fn get_flush_strategy_for_mode( + aws_config: &AwsConfig, + configured_strategy: FlushStrategy, +) -> FlushStrategy { + if !aws_config.is_managed_instance_mode() { + return configured_strategy; + } + + // Check if flush strategy needs to be enforced and log if so + if let FlushStrategy::Continuously(_) = configured_strategy { + configured_strategy + } else { + // Only log if the user explicitly configured a non-default strategy + if !matches!(configured_strategy, FlushStrategy::Default) { + warn!( + "Managed Instance mode detected. Flush strategy '{}' is not compatible with managed instance mode. \ + Enforcing continuous flush strategy with {}ms interval for optimal performance.", + configured_strategy.name(), + DEFAULT_CONTINUOUS_FLUSH_INTERVAL + ); + } + + FlushStrategy::Continuously(PeriodicStrategy { + interval: DEFAULT_CONTINUOUS_FLUSH_INTERVAL, + }) + } +} + fn create_api_key_factory(config: &Arc, aws_config: &Arc) -> Arc { let config = Arc::clone(config); let aws_config = Arc::clone(aws_config); @@ -415,6 +517,7 @@ async fn extension_loop_active( Arc::clone(&api_key_factory), &tags_provider, event_bus_tx.clone(), + aws_config.is_managed_instance_mode(), ); let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = @@ -488,6 +591,7 @@ async fn extension_loop_active( logs_agent_channel, event_bus_tx.clone(), config.serverless_logs_enabled, + aws_config.is_managed_instance_mode(), ) .await?; @@ -499,16 +603,256 @@ async fn extension_loop_active( stats_concentrator.clone(), ); - let mut flush_control = - FlushControl::new(config.serverless_flush_strategy, config.flush_timeout); - - let mut race_flush_interval = flush_control.get_flush_interval(); - race_flush_interval.tick().await; // discard first tick, which is instantaneous + // Validate and get the appropriate flush strategy for the current mode + let flush_strategy = get_flush_strategy_for_mode(&aws_config, config.serverless_flush_strategy); + let mut flush_control = FlushControl::new(flush_strategy, config.flush_timeout); debug!( "Datadog Next-Gen Extension ready in {:}ms", start_time.elapsed().as_millis().to_string() ); + + if aws_config.is_managed_instance_mode() { + // Clone Arc references for the background flusher task + let logs_flusher_clone = logs_flusher.clone(); + let metrics_flushers_clone = Arc::clone(&metrics_flushers); + let trace_flusher_clone = Arc::clone(&trace_flusher); + let stats_flusher_clone = Arc::clone(&stats_flusher); + let proxy_flusher_clone = proxy_flusher.clone(); + let metrics_aggr_handle_clone = metrics_aggregator_handle.clone(); + + // In Managed Instance mode, create a separate interval for the background flusher task. + // We don't reuse race_flush_interval because we need to configure the missed tick + // behavior before discarding the first tick. While creating a new interval may seem + // redundant, it keeps Managed Instance and OnDemand mode flush intervals properly isolated, + // making the code easier to maintain and less error-prone. + // + // Use Skip behavior to prevent accumulating missed ticks if flushes take longer + // than the interval. This ensures we maintain a steady flush cadence without + // bursts of catch-up ticks, which is important since flushes are non-blocking. + let mut managed_instance_mode_flush_interval = flush_control.get_flush_interval(); + managed_instance_mode_flush_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + managed_instance_mode_flush_interval.tick().await; // discard first tick + let cancel_token_clone = telemetry_listener_cancel_token.clone(); + + // Spawn a background task for continuous periodic flushing in Managed Instance mode. + // A background task continuously flushes metrics, logs, + // traces, and stats at regular intervals (configured by flush_control). This ensures + // data is sent to Datadog even while concurrent invocations are being processed. + // The flushing happens independently of invocation lifecycle events. + // This background task runs until shutdown is signaled via cancel_token_clone. + let flush_task_handle = tokio::spawn(async move { + let mut pending_flush_handles = PendingFlushHandles::new(); + + loop { + tokio::select! { + _ = managed_instance_mode_flush_interval.tick() => { + if !pending_flush_handles.has_pending_handles() { + // Only spawn new flush if no pending flushes to prevent resource buildup + pending_flush_handles.spawn_non_blocking_flushes( + &logs_flusher_clone, + &trace_flusher_clone, + &metrics_flushers_clone, + &stats_flusher_clone, + &proxy_flusher_clone, + &metrics_aggr_handle_clone, + ).await; + } + } + () = cancel_token_clone.cancelled() => { + debug!("Managed Instance mode: periodic flusher task cancelled, waiting for pending flushes"); + // Wait for any pending flushes before exiting + pending_flush_handles.await_flush_handles( + &logs_flusher_clone, + &trace_flusher_clone, + &metrics_flushers_clone, + &proxy_flusher_clone, + ).await; + break; + } + } + } + }); + + // Spawn a separate task to handle the SHUTDOWN event from /next endpoint. + // This task waits for the Lambda runtime to signal shutdown via the Extensions API. + // When shutdown is received, it cancels the background flusher and signals the main + // event loop to begin graceful shutdown. + let shutdown_cancel_token = CancellationToken::new(); + let shutdown_cancel_token_clone = shutdown_cancel_token.clone(); + let invocation_processor_handle_clone = invocation_processor_handle.clone(); + let runtime_api_clone = aws_config.runtime_api.clone(); + let extension_id_clone = r.extension_id.clone(); + let client_clone = client.clone(); + + // Main event loop for Managed Instance mode: process telemetry events until shutdown + // + // the extension registers for SHUTDOWN events ONLY (not INVOKE events), + // 1. A separate task waits for the SHUTDOWN event from next_event() + // 2. This loop processes telemetry events from event_bus + // 3. When SHUTDOWN is received (detected via cancel token), break the loop + // 4. Invocation lifecycle events (START, REPORT, etc.) come via Telemetry API, + // not via next_event() responses + // + // This allows Managed Instance mode to handle concurrent invocations while OnDemand mode + // processes invocations sequentially, one at a time. + tokio::spawn(async move { + // In Managed Instance mode, the only event we can subscribe to is SHUTDOWN, meaning that + // this call will block until the shutdown event is received. + // We can use this to signal other tasks to shutdown and wait for them to complete. + // Therefore, we need to have it in a separate task to avoid blocking the main loop. + debug!("Managed Instance mode: waiting for shutdown event"); + + loop { + let next_response = + extension::next_event(&client_clone, &runtime_api_clone, &extension_id_clone) + .await; + + match next_response { + Ok(NextEventResponse::Shutdown { .. }) => { + debug!("Shutdown event received, stopping extension loop"); + // Notify the invocation processor about shutdown + if let Err(e) = invocation_processor_handle_clone.on_shutdown_event().await + { + error!("Failed to send shutdown event to processor: {}", e); + } + // Signal all other tasks to shutdown + shutdown_cancel_token_clone.cancel(); + break; + } + Ok(NextEventResponse::Invoke { .. }) => { + error!( + "Received unexpected Invoke event in Managed Instance mode - this should not happen. \ + Managed Instance mode should only subscribe to SHUTDOWN events." + ); + shutdown_cancel_token_clone.cancel(); + break; + } + Err(ExtensionError::HttpError(e)) if e.is_timeout() || e.is_connect() => { + debug!( + "Transient network error waiting for shutdown event: {}. Retrying...", + e + ); + continue; + } + Err(e) => { + error!( + "Unrecoverable error waiting for shutdown event: {:?}. \ + Initiating emergency shutdown.", + e + ); + shutdown_cancel_token_clone.cancel(); + break; + } + } + } + + debug!("Shutdown task completed"); + }); + + 'managed_instance_event_loop: loop { + tokio::select! { + biased; + // Process telemetry events (platform.start, platform.report, etc.) sent from + // the Telemetry API listener. These events provide invocation lifecycle information + // without requiring next_event() calls. The biased ordering ensures we prioritize + // processing telemetry events before checking for shutdown. + Some(event) = event_bus.rx.recv() => { + handle_event_bus_event(event, + invocation_processor_handle.clone(), + appsec_processor.clone(), + tags_provider.clone(), + trace_processor.clone(), + trace_agent_channel.clone(), + stats_concentrator.clone()). + await; + } + // Detect when shutdown has been signaled by the shutdown task. + // This happens when the /next endpoint returns a SHUTDOWN event. + () = shutdown_cancel_token.cancelled() => { + debug!("Shutdown signal received, exiting event loop"); + break 'managed_instance_event_loop; + } + } + } + + // Shutdown sequence + debug!("Initiating shutdown sequence"); + + // Wait for tombstone event from telemetry listener to ensure all events are processed + // This is the result of code refactoring which is shared by OnDemand mode as well. + wait_for_tombstone_event( + &mut event_bus, + &invocation_processor_handle, + appsec_processor.clone(), + tags_provider.clone(), + trace_processor.clone(), + trace_agent_channel.clone(), + stats_concentrator.clone(), + 300, + ) + .await; + + // Cancel background tasks + cancel_background_services( + api_runtime_proxy_shutdown_signal.as_ref(), + otlp_cancel_token.as_ref(), + &trace_agent_shutdown_token, + &dogstatsd_cancel_token, + &telemetry_listener_cancel_token, + &lifecycle_listener_shutdown_token, + ); + + // Wait for background flusher to complete gracefully + if let Err(e) = flush_task_handle.await { + error!("Error waiting for background flush task: {e:?}"); + } + + // Final flush to send any remaining observability data before shutdown. + // + // Managed Instance Mode vs OnDemand Mode Final Flush: + // + // While both modes perform a final flush during shutdown, the context differs: + // + // - **Managed Instance Mode (this code)**: Throughout the execution environment's lifetime, + // a background task has been continuously flushing data at regular intervals + // (see flush_task_handle above). This final flush captures any data that was + // generated after the last periodic flush and before shutdown was signaled. + // Since concurrent invocations may have completed just before shutdown, this + // ensures we don't lose their metrics, logs, and traces. + // + // - **OnDemand Mode**: Flushing is tied to invocation lifecycle, so data is typically + // flushed at the end of each invocation. The final flush captures any remaining + // data from the last invocation that may not have been sent yet. + // + // In both modes, we pass `force_flush_trace_stats=true` to ensure trace statistics + // are flushed regardless of timing constraints, as this is our last opportunity to + // send data before the Lambda execution environment terminates. + // + // Final flush without interval reset. We pass None for race_flush_interval since + // this is the final operation before shutdown and resetting the interval timing + // serves no purpose. This avoids creating an unnecessary interval object. + let mut locked_metrics = metrics_flushers.lock().await; + blocking_flush_all( + &logs_flusher, + &mut locked_metrics, + &*trace_flusher, + &*stats_flusher, + &proxy_flusher, + None, + &metrics_aggregator_handle.clone(), + true, // force_flush_trace_stats + ) + .await; + + return Ok(()); + } + + // Below is for On-Demand mode only + let mut race_flush_interval = flush_control.get_flush_interval(); + race_flush_interval.tick().await; // discard first tick, which is instantaneous + let next_lambda_response = extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await; // first invoke we must call next @@ -542,7 +886,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &proxy_flusher, - &mut race_flush_interval, + Some(&mut race_flush_interval), &metrics_aggregator_handle.clone(), false, ) @@ -558,7 +902,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &proxy_flusher, - &mut race_flush_interval, + Some(&mut race_flush_interval), &metrics_aggregator_handle.clone(), false, ) @@ -628,7 +972,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &proxy_flusher, - &mut race_flush_interval, + Some(&mut race_flush_interval), &metrics_aggregator_handle, false, // force_flush_trace_stats ) @@ -666,7 +1010,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &proxy_flusher, - &mut race_flush_interval, + Some(&mut race_flush_interval), &metrics_aggregator_handle, false, // force_flush_trace_stats ) @@ -701,45 +1045,27 @@ async fn extension_loop_active( ) .await; // Wait for tombstone event from telemetry listener to ensure all events are processed - 'shutdown: loop { - tokio::select! { - Some(event) = event_bus.rx.recv() => { - if let Event::Tombstone = event { - debug!("Received tombstone event, draining remaining events"); - // Drain without waiting - loop { - match event_bus.rx.try_recv() { - Ok(event) => { - handle_event_bus_event(event, invocation_processor_handle.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await; - }, - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break 'shutdown, - // Empty signals there are still outstanding senders - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { - debug!("No more events to process but still have senders, continuing to drain..."); - }, - } - } - } else { - handle_event_bus_event(event, invocation_processor_handle.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await; - } - } - // Add timeout to prevent hanging indefinitely - () = tokio::time::sleep(tokio::time::Duration::from_millis(300)) => { - debug!("Timeout waiting for teardown, proceeding with shutdown"); - break 'shutdown; - } - } - } + wait_for_tombstone_event( + &mut event_bus, + &invocation_processor_handle, + appsec_processor.clone(), + tags_provider.clone(), + trace_processor.clone(), + trace_agent_channel.clone(), + stats_concentrator.clone(), + 300, + ) + .await; - if let Some(api_runtime_proxy_cancel_token) = api_runtime_proxy_shutdown_signal { - api_runtime_proxy_cancel_token.cancel(); - } - if let Some(otlp_cancel_token) = otlp_cancel_token { - otlp_cancel_token.cancel(); - } - trace_agent_shutdown_token.cancel(); - dogstatsd_cancel_token.cancel(); - lifecycle_listener_shutdown_token.cancel(); + // Cancel background services + cancel_background_services( + api_runtime_proxy_shutdown_signal.as_ref(), + otlp_cancel_token.as_ref(), + &trace_agent_shutdown_token, + &dogstatsd_cancel_token, + &telemetry_listener_cancel_token, + &lifecycle_listener_shutdown_token, + ); // gotta lock here let mut locked_metrics = metrics_flushers.lock().await; @@ -749,7 +1075,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &proxy_flusher, - &mut race_flush_interval, + Some(&mut race_flush_interval), &metrics_aggregator_handle, true, // force_flush_trace_stats ) @@ -775,7 +1101,7 @@ async fn blocking_flush_all( trace_flusher: &impl TraceFlusher, stats_flusher: &impl StatsFlusher, proxy_flusher: &ProxyFlusher, - race_flush_interval: &mut tokio::time::Interval, + race_flush_interval: Option<&mut tokio::time::Interval>, metrics_aggr_handle: &MetricsAggregatorHandle, force_flush_trace_stats: bool, ) { @@ -800,10 +1126,70 @@ async fn blocking_flush_all( stats_flusher.flush(force_flush_trace_stats), proxy_flusher.flush(None), ); - race_flush_interval.reset(); + if let Some(interval) = race_flush_interval { + interval.reset(); + } } +/// Wait for the `Tombstone` event from telemetry listener to ensure all events are processed. +/// This function will timeout after the specified duration to prevent hanging indefinitely. #[allow(clippy::too_many_arguments)] +async fn wait_for_tombstone_event( + event_bus: &mut EventBus, + invocation_processor_handle: &InvocationProcessorHandle, + appsec_processor: Option>>, + tags_provider: Arc, + trace_processor: Arc, + trace_agent_channel: Sender, + stats_concentrator: StatsConcentratorHandle, + timeout_ms: u64, +) { + 'shutdown: loop { + tokio::select! { + Some(event) = event_bus.rx.recv() => { + if let Event::Tombstone = event { + debug!("Received tombstone event, proceeding with shutdown"); + break 'shutdown; + } + handle_event_bus_event( + event, + invocation_processor_handle.clone(), + appsec_processor.clone(), + tags_provider.clone(), + trace_processor.clone(), + trace_agent_channel.clone(), + stats_concentrator.clone(), + ).await; + } + () = tokio::time::sleep(tokio::time::Duration::from_millis(timeout_ms)) => { + debug!("Timeout waiting for tombstone event, proceeding with shutdown"); + break 'shutdown; + } + } + } +} + +/// Cancel all background service tasks in preparation for shutdown. +fn cancel_background_services( + api_runtime_proxy_shutdown_signal: Option<&CancellationToken>, + otlp_cancel_token: Option<&CancellationToken>, + trace_agent_shutdown_token: &CancellationToken, + dogstatsd_cancel_token: &CancellationToken, + telemetry_listener_cancel_token: &CancellationToken, + lifecycle_listener_shutdown_token: &CancellationToken, +) { + if let Some(token) = api_runtime_proxy_shutdown_signal { + token.cancel(); + } + if let Some(token) = otlp_cancel_token { + token.cancel(); + } + trace_agent_shutdown_token.cancel(); + dogstatsd_cancel_token.cancel(); + telemetry_listener_cancel_token.cancel(); + lifecycle_listener_shutdown_token.cancel(); +} + #[allow(clippy::too_many_lines)] async fn handle_event_bus_event( event: Event, @@ -913,13 +1299,31 @@ async fn handle_event_bus_event( TelemetryRecord::PlatformReport { ref request_id, metrics, - .. + status, + ref error_type, + ref spans, } => { if let Err(e) = invocation_processor_handle - .on_platform_report(request_id.clone(), metrics, event.time.timestamp()) + .on_platform_report( + request_id, + metrics, + event.time.timestamp(), + status, + error_type, + spans, + tags_provider.clone(), + Arc::new(SendingTraceProcessor { + appsec: appsec_processor.clone(), + processor: trace_processor.clone(), + trace_tx: trace_agent_channel.clone(), + stats_generator: Arc::new(StatsGenerator::new( + stats_concentrator.clone(), + )), + }), + ) .await { - error!("Failed to send platform report to processor: {}", e); + error!("Failed to send platform runtime report to processor: {}", e); } return Some(event); } @@ -1000,6 +1404,7 @@ fn start_logs_agent( api_key_factory: Arc, tags_provider: &Arc, event_bus: Sender, + is_managed_instance_mode: bool, ) -> ( Sender, LogsFlusher, @@ -1017,6 +1422,7 @@ fn start_logs_agent( Arc::clone(config), event_bus, aggregator_handle.clone(), + is_managed_instance_mode, ); let cancel_token = agent.cancel_token(); // Start logs agent in background @@ -1261,6 +1667,7 @@ async fn setup_telemetry_client( logs_tx: Sender, event_bus_tx: Sender, logs_enabled: bool, + managed_instance_mode: bool, ) -> anyhow::Result { let listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_tx, event_bus_tx); @@ -1281,6 +1688,7 @@ async fn setup_telemetry_client( extension_id, TELEMETRY_PORT, logs_enabled, + managed_instance_mode, ) .await .map_err(|e| anyhow::anyhow!("Failed to subscribe to telemetry: {e:?}"))?; diff --git a/bottlecap/src/config/aws.rs b/bottlecap/src/config/aws.rs index 62100e477..d1af40a8f 100644 --- a/bottlecap/src/config/aws.rs +++ b/bottlecap/src/config/aws.rs @@ -11,6 +11,9 @@ const AWS_LAMBDA_FUNCTION_NAME: &str = "AWS_LAMBDA_FUNCTION_NAME"; const AWS_LAMBDA_RUNTIME_API: &str = "AWS_LAMBDA_RUNTIME_API"; const AWS_LWA_LAMBDA_RUNTIME_API_PROXY: &str = "AWS_LWA_LAMBDA_RUNTIME_API_PROXY"; const AWS_LAMBDA_EXEC_WRAPPER: &str = "AWS_LAMBDA_EXEC_WRAPPER"; +const AWS_LAMBDA_INITIALIZATION_TYPE: &str = "AWS_LAMBDA_INITIALIZATION_TYPE"; + +pub const LAMBDA_MANAGED_INSTANCES_INIT_TYPE: &str = "lambda-managed-instances"; #[allow(clippy::module_name_repetitions)] #[derive(Debug, Clone)] @@ -21,6 +24,7 @@ pub struct AwsConfig { pub runtime_api: String, pub sandbox_init_time: Instant, pub exec_wrapper: Option, + pub initialization_type: String, } impl AwsConfig { @@ -33,8 +37,15 @@ impl AwsConfig { runtime_api: env::var(AWS_LAMBDA_RUNTIME_API).unwrap_or_default(), sandbox_init_time: start_time, exec_wrapper: env::var(AWS_LAMBDA_EXEC_WRAPPER).ok(), + initialization_type: env::var(AWS_LAMBDA_INITIALIZATION_TYPE).unwrap_or_default(), } } + + #[must_use] + pub fn is_managed_instance_mode(&self) -> bool { + self.initialization_type + .eq(LAMBDA_MANAGED_INSTANCES_INIT_TYPE) + } } #[allow(clippy::module_name_repetitions)] diff --git a/bottlecap/src/config/flush_strategy.rs b/bottlecap/src/config/flush_strategy.rs index 51e9710eb..0a09e8227 100644 --- a/bottlecap/src/config/flush_strategy.rs +++ b/bottlecap/src/config/flush_strategy.rs @@ -21,6 +21,20 @@ pub enum FlushStrategy { Continuously(PeriodicStrategy), } +impl FlushStrategy { + /// Returns the name of the flush strategy as a string slice. + #[must_use] + pub const fn name(&self) -> &'static str { + match self { + FlushStrategy::Default => "default", + FlushStrategy::End => "end", + FlushStrategy::Periodically(_) => "periodically", + FlushStrategy::EndPeriodically(_) => "end-periodically", + FlushStrategy::Continuously(_) => "continuously", + } + } +} + // A restricted subset of `FlushStrategy`. The Default strategy is now allowed, which is required to be // translated into a concrete strategy. #[allow(clippy::module_name_repetitions)] @@ -121,4 +135,34 @@ mod tests { let flush_strategy: FlushStrategy = serde_json::from_str("\"end,invalid\"").unwrap(); assert_eq!(flush_strategy, FlushStrategy::Default); } + + #[test] + fn test_flush_strategy_name_default() { + let strategy = FlushStrategy::Default; + assert_eq!(strategy.name(), "default"); + } + + #[test] + fn test_flush_strategy_name_end() { + let strategy = FlushStrategy::End; + assert_eq!(strategy.name(), "end"); + } + + #[test] + fn test_flush_strategy_name_periodically() { + let strategy = FlushStrategy::Periodically(PeriodicStrategy { interval: 1000 }); + assert_eq!(strategy.name(), "periodically"); + } + + #[test] + fn test_flush_strategy_name_end_periodically() { + let strategy = FlushStrategy::EndPeriodically(PeriodicStrategy { interval: 2000 }); + assert_eq!(strategy.name(), "end-periodically"); + } + + #[test] + fn test_flush_strategy_name_continuously() { + let strategy = FlushStrategy::Continuously(PeriodicStrategy { interval: 30000 }); + assert_eq!(strategy.name(), "continuously"); + } } diff --git a/bottlecap/src/extension/mod.rs b/bottlecap/src/extension/mod.rs index a89a96018..2dcfa8a59 100644 --- a/bottlecap/src/extension/mod.rs +++ b/bottlecap/src/extension/mod.rs @@ -13,6 +13,9 @@ pub const EXTENSION_ID_HEADER: &str = "Lambda-Extension-Identifier"; pub const EXTENSION_ACCEPT_FEATURE_HEADER: &str = "Lambda-Extension-Accept-Feature"; pub const EXTENSION_ROUTE: &str = "2020-01-01/extension"; +const EXTENSION_ON_DEMAND_EVENTS: &[&str] = &["INVOKE", "SHUTDOWN"]; +const EXTENSION_MANAGED_INSTANCE_EVENTS: &[&str] = &["SHUTDOWN"]; + /// Error conditions that can arise from extension operations #[derive(thiserror::Error, Debug)] pub enum ExtensionError { @@ -80,13 +83,23 @@ pub fn base_url(route: &str, runtime_api: &str) -> String { format!("http://{runtime_api}/{route}") } +fn get_subscription_event_types(managed_instance_mode: bool) -> Vec<&'static str> { + (if managed_instance_mode { + EXTENSION_MANAGED_INSTANCE_EVENTS + } else { + EXTENSION_ON_DEMAND_EVENTS + }) + .to_vec() +} + pub async fn register( client: &Client, runtime_api: &str, extension_name: &str, + managed_instance_mode: bool, ) -> Result { - let events_to_subscribe_to = serde_json::json!({ - "events": ["INVOKE", "SHUTDOWN"] + let events = serde_json::json!({ + "events": get_subscription_event_types(managed_instance_mode) }); let base_url = base_url(EXTENSION_ROUTE, runtime_api); @@ -96,7 +109,7 @@ pub async fn register( .post(&url) .header(EXTENSION_NAME_HEADER, extension_name) .header(EXTENSION_ACCEPT_FEATURE_HEADER, EXTENSION_FEATURES) - .json(&events_to_subscribe_to) + .json(&events) .send() .await?; diff --git a/bottlecap/src/extension/telemetry/events.rs b/bottlecap/src/extension/telemetry/events.rs index a8c3ed96c..77ad3a3d1 100644 --- a/bottlecap/src/extension/telemetry/events.rs +++ b/bottlecap/src/extension/telemetry/events.rs @@ -4,6 +4,8 @@ use serde::Deserialize; use serde_json::Value; use std::fmt::Display; +use crate::config::aws::LAMBDA_MANAGED_INSTANCES_INIT_TYPE; + /// Payload received from the Telemetry API #[derive(Clone, Debug, Deserialize, PartialEq)] pub struct TelemetryEvent { @@ -14,6 +16,14 @@ pub struct TelemetryEvent { pub record: TelemetryRecord, } +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct TelemetrySpan { + pub name: String, + pub start: DateTime, + #[serde(rename = "durationMs")] + pub duration_ms: f64, +} + /// Record in a `LambdaTelemetry` entry #[derive(Clone, Debug, Deserialize, PartialEq)] #[serde(tag = "type", content = "record", rename_all = "lowercase")] @@ -92,6 +102,7 @@ pub enum TelemetryRecord { /// When unsuccessful, the `error_type` describes what kind of error occurred error_type: Option, metrics: ReportMetrics, + spans: Option>, }, /// Extension-specific record @@ -164,6 +175,9 @@ pub enum InitType { ProvisionedConcurrency, /// `SnapStart` SnapStart, + /// Managed Instance mode + #[serde(rename = "lambda-managed-instances")] + ManagedInstance, } impl Display for InitType { @@ -172,6 +186,7 @@ impl Display for InitType { InitType::OnDemand => "on-demand", InitType::ProvisionedConcurrency => "provisioned-concurrency", InitType::SnapStart => "SnapStart", + InitType::ManagedInstance => LAMBDA_MANAGED_INSTANCES_INIT_TYPE, }; write!(f, "{style}") } @@ -224,9 +239,26 @@ pub struct RuntimeDoneMetrics { } /// Report metrics +#[derive(Clone, Copy, Debug, Deserialize, PartialEq)] +#[serde(untagged)] +pub enum ReportMetrics { + OnDemand(OnDemandReportMetrics), + ManagedInstance(ManagedInstanceReportMetrics), +} + +impl ReportMetrics { + #[must_use] + pub fn duration_ms(&self) -> f64 { + match self { + ReportMetrics::OnDemand(metrics) => metrics.duration_ms, + ReportMetrics::ManagedInstance(metrics) => metrics.duration_ms, + } + } +} + #[derive(Clone, Copy, Debug, Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] -pub struct ReportMetrics { +pub struct OnDemandReportMetrics { /// Total duration in milliseconds, includes Extension /// and Lambda execution time. pub duration_ms: f64, @@ -244,9 +276,16 @@ pub struct ReportMetrics { pub restore_duration_ms: Option, } +#[derive(Clone, Copy, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ManagedInstanceReportMetrics { + pub duration_ms: f64, +} + #[cfg(test)] mod tests { use super::*; + use chrono::{TimeZone, Timelike}; macro_rules! deserialize_tests { ($($name:ident: $value:expr,)*) => { @@ -363,21 +402,53 @@ mod tests { }, ), - // platform.report - platform_report: ( + // platform.report - on demand + platform_report_on_demand: ( r#"{"time":"2022-10-21T14:05:05.766Z","type":"platform.report","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","metrics":{"durationMs":2599.4,"billedDurationMs":2600,"memorySizeMB":128,"maxMemoryUsedMB":94,"initDurationMs":549.04},"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"},"status":"success"}}"#, TelemetryRecord::PlatformReport { request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(), status: Status::Success, error_type: None, - metrics: ReportMetrics { + metrics: ReportMetrics::OnDemand(OnDemandReportMetrics { duration_ms: 2599.4, billed_duration_ms: 2600, memory_size_mb:128, max_memory_used_mb:94, init_duration_ms: Some(549.04), restore_duration_ms: None, - }, + }), + spans: None, + }, + ), + + // platform.report - managed_instance + platform_report_managed_instance: ( + r#"{"time":"2025-09-19T19:36:50.881Z","type":"platform.report","record":{"requestId":"13d1305b-a2f5-440c-bfbe-686ccff3d3e0","status":"success","metrics":{"durationMs":1.148},"spans":[{"name":"responseLatency","start":"2025-09-19T19:36:50.880Z","durationMs":0.847},{"name":"responseDuration","start":"2025-09-19T19:36:50.880Z","durationMs":0.127}]}}"#, + TelemetryRecord::PlatformReport { + request_id: "13d1305b-a2f5-440c-bfbe-686ccff3d3e0".to_string(), + status: Status::Success, + error_type: None, + metrics: ReportMetrics::ManagedInstance(ManagedInstanceReportMetrics { duration_ms: 1.148 }), + spans: Some(vec![ + TelemetrySpan { + name: "responseLatency".to_string(), + start: chrono::Utc.with_ymd_and_hms(2025, 9, 19, 19, 36, 50) + .single() + .expect("test date should be valid") + .with_nanosecond(880_000_000) + .expect("test nanosecond should be valid"), + duration_ms: 0.847, + }, + TelemetrySpan { + name: "responseDuration".to_string(), + start: chrono::Utc.with_ymd_and_hms(2025, 9, 19, 19, 36, 50) + .single() + .expect("test date should be valid") + .with_nanosecond(880_000_000) + .expect("test nanosecond should be valid"), + duration_ms: 0.127, + }, + ]), }, ), diff --git a/bottlecap/src/extension/telemetry/listener.rs b/bottlecap/src/extension/telemetry/listener.rs index 5503ce1e8..68349374f 100644 --- a/bottlecap/src/extension/telemetry/listener.rs +++ b/bottlecap/src/extension/telemetry/listener.rs @@ -115,7 +115,7 @@ impl TelemetryListener { // Instead, log it and move on. // This will result in a dropped payload, but may be from // events we haven't added support for yet - debug!("Failed to parse telemetry events: {:?}", e); + debug!("Failed to parse telemetry events `{body}`, failed with: {e}"); return (StatusCode::OK, "Failed to parse telemetry events").into_response(); } }; diff --git a/bottlecap/src/extension/telemetry/mod.rs b/bottlecap/src/extension/telemetry/mod.rs index 5e0043cce..2e4913e18 100644 --- a/bottlecap/src/extension/telemetry/mod.rs +++ b/bottlecap/src/extension/telemetry/mod.rs @@ -7,6 +7,13 @@ pub mod events; pub mod listener; pub const TELEMETRY_SUBSCRIPTION_ROUTE: &str = "2022-07-01/telemetry"; + +// Technically, we could use the same schema version for both modes, +// but we'll keep them separate for now to avoid regressions. The +// naming is just to differentiate which one contains Managed Instance-specific +// events. +pub const MANAGED_INSTANCE_SCHEMA_VERSION: &str = "2025-01-29"; +pub const ON_DEMAND_SCHEMA_VERSION: &str = "2022-12-13"; // todo(astuyve) should be 8124 on /lambda/logs but // telemetry is implemented on a raw socket now and // does not multiplex routes on the same port. @@ -37,13 +44,19 @@ pub async fn subscribe( extension_id: &str, destination_port: u16, logs_enabled: bool, + managed_instance_mode: bool, ) -> Result { + let schema_version = if managed_instance_mode { + MANAGED_INSTANCE_SCHEMA_VERSION + } else { + ON_DEMAND_SCHEMA_VERSION + }; let url = base_url(TELEMETRY_SUBSCRIPTION_ROUTE, runtime_api); let response = client .put(&url) .header(EXTENSION_ID_HEADER, extension_id) .json(&serde_json::json!({ - "schemaVersion": "2022-12-13", + "schemaVersion": schema_version, "destination": { "protocol": "HTTP", "URI": format!("http://sandbox:{}/", destination_port), diff --git a/bottlecap/src/lifecycle/flush_control.rs b/bottlecap/src/lifecycle/flush_control.rs index a939ffe24..9ddb0e40d 100644 --- a/bottlecap/src/lifecycle/flush_control.rs +++ b/bottlecap/src/lifecycle/flush_control.rs @@ -5,6 +5,7 @@ use tokio::time::{Interval, MissedTickBehavior::Skip}; use crate::lifecycle::invocation_times::InvocationTimes; const DEFAULT_FLUSH_INTERVAL: u64 = 60 * 1000; // 60s +pub const DEFAULT_CONTINUOUS_FLUSH_INTERVAL: u64 = 30 * 1000; // 30s const FIFTEEN_MINUTES: u64 = 15 * 60 * 1000; #[derive(Clone, Copy, Debug, PartialEq)] diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index c8f4fac18..321540168 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -128,6 +128,12 @@ pub struct ContextBuffer { platform_runtime_done_events_request_ids: VecDeque, universal_instrumentation_start_events: VecDeque, universal_instrumentation_end_events: VecDeque, + /// Managed instance mode: Request ID-aware buffers for concurrent invocations + /// Using hash map for O(1) lookups by request id + universal_instrumentation_start_events_with_id: + HashMap, + universal_instrumentation_end_events_with_id: + HashMap, pub sorted_reparenting_info: VecDeque, } @@ -136,6 +142,13 @@ struct UniversalInstrumentationData { payload_value: Value, } +/// Enhanced version with `request_id` for managed instance mode pairing +pub struct UniversalInstrumentationDataWithRequestId { + pub request_id: String, + pub headers: HashMap, + pub payload_value: Value, +} + impl Default for ContextBuffer { /// Creates a new `ContextBuffer` with a default capacity of 500 /// This gives us enough capacity to process events which may be delayed due to async tasks @@ -155,6 +168,8 @@ impl ContextBuffer { platform_runtime_done_events_request_ids: VecDeque::with_capacity(capacity), universal_instrumentation_start_events: VecDeque::with_capacity(capacity), universal_instrumentation_end_events: VecDeque::with_capacity(capacity), + universal_instrumentation_start_events_with_id: HashMap::with_capacity(capacity), + universal_instrumentation_end_events_with_id: HashMap::with_capacity(capacity), sorted_reparenting_info: VecDeque::with_capacity(capacity), } } @@ -319,6 +334,94 @@ impl ContextBuffer { } } + // ======================================================================== + // Managed Instance Mode: Request ID-Based Pairing Methods + // ======================================================================== + + /// Managed instance mode: Pair `UniversalInstrumentationStart` with `request_id`. + /// Returns true if the Invoke event already occurred (immediate pairing). + /// Returns false if the event was buffered (Invoke hasn't happened yet). + pub fn pair_universal_instrumentation_start_with_request_id( + &mut self, + request_id: &str, + headers: &HashMap, + payload_value: &Value, + ) -> bool { + // Check if this request_id is waiting in the invoke_events queue + if let Some(pos) = self + .invoke_events_request_ids + .iter() + .position(|id| id == request_id) + { + // Found matching Invoke event, remove it and process immediately + self.invoke_events_request_ids.remove(pos); + return true; + } + + // Invoke hasn't happened yet, buffer this event with request_id using HashMap for O(1) lookup + self.universal_instrumentation_start_events_with_id.insert( + request_id.to_string(), + UniversalInstrumentationDataWithRequestId { + request_id: request_id.to_string(), + headers: headers.clone(), + payload_value: payload_value.clone(), + }, + ); + false + } + + /// Managed instance mode: When Invoke event arrives, retrieve and remove buffered + /// `UniversalInstrumentationStart` with matching `request_id`. + pub fn take_universal_instrumentation_start_for_request_id( + &mut self, + request_id: &str, + ) -> Option { + self.universal_instrumentation_start_events_with_id + .remove(request_id) + } + + /// Managed instance mode: Pair `UniversalInstrumentationEnd` with `request_id`. + /// Returns true if `PlatformRuntimeDone` already occurred (immediate pairing). + /// Returns false if the event was buffered (`RuntimeDone` hasn't happened yet). + pub fn pair_universal_instrumentation_end_with_request_id( + &mut self, + request_id: &str, + headers: &HashMap, + payload_value: &Value, + ) -> bool { + // Check if this request_id is waiting in the platform_runtime_done queue + if let Some(pos) = self + .platform_runtime_done_events_request_ids + .iter() + .position(|id| id == request_id) + { + // Found matching RuntimeDone event, remove it and process immediately + self.platform_runtime_done_events_request_ids.remove(pos); + return true; + } + + // RuntimeDone hasn't happened yet, buffer this event with request_id using HashMap for O(1) lookup + self.universal_instrumentation_end_events_with_id.insert( + request_id.to_string(), + UniversalInstrumentationDataWithRequestId { + request_id: request_id.to_string(), + headers: headers.clone(), + payload_value: payload_value.clone(), + }, + ); + false + } + + /// Managed instance mode: When `PlatformRuntimeDone` event arrives, retrieve and remove buffered + /// `UniversalInstrumentationEnd` with matching `request_id`. + pub fn take_universal_instrumentation_end_for_request_id( + &mut self, + request_id: &str, + ) -> Option { + self.universal_instrumentation_end_events_with_id + .remove(request_id) + } + /// Creates a new `Context` and adds it to the buffer given the `request_id` /// and the `invocation_span`. /// @@ -333,6 +436,27 @@ impl ContextBuffer { self.insert(context); } + /// Updates the `request_id` of a context with no `request_id` in the buffer. + /// Used in Managed Instance mode to update the empty context created on init start. + /// Returns true if an empty context was found and updated, false otherwise. + /// + pub fn update_empty_context_request_id(&mut self, request_id: &str) -> bool { + if let Some(context) = self + .buffer + .iter_mut() + .find(|context| context.request_id.is_empty()) + { + context.request_id = request_id.to_string(); + context + .invocation_span + .meta + .insert("request_id".to_string(), request_id.to_string()); + true + } else { + false + } + } + /// Adds the start time to the invocation span of a `Context` in the buffer. /// pub fn add_start_time(&mut self, request_id: &String, start_time: i64) { @@ -780,4 +904,236 @@ mod tests { assert!(result.is_some()); assert_eq!(result.unwrap(), request_id2); } + + // ======================================================================== + // Managed Instance Mode Tests: Request ID-Based Pairing + // ======================================================================== + + #[test] + fn test_managed_instance_universal_instrumentation_start_invoke_first() { + // Test case: Invoke arrives before UniversalInstrumentationStart (common case) + let mut buffer = ContextBuffer::with_capacity(10); + let request_id = "req-123"; + let mut headers = HashMap::new(); + headers.insert("test-header".to_string(), "test-value".to_string()); + let payload = json!({ "test": "payload" }); + + // Simulate Invoke event arriving first + buffer + .invoke_events_request_ids + .push_back(request_id.to_string()); + + // UniversalInstrumentationStart arrives - should pair immediately + let result = buffer + .pair_universal_instrumentation_start_with_request_id(request_id, &headers, &payload); + assert!(result, "Should return true when Invoke already happened"); + assert_eq!( + buffer.invoke_events_request_ids.len(), + 0, + "Should remove paired Invoke event" + ); + assert_eq!( + buffer.universal_instrumentation_start_events_with_id.len(), + 0, + "Should not buffer when immediate pairing occurs" + ); + } + + #[test] + fn test_managed_instance_universal_instrumentation_start_buffering() { + // Test case: UniversalInstrumentationStart arrives before Invoke (out of order) + let mut buffer = ContextBuffer::with_capacity(10); + let request_id = "req-456"; + let mut headers = HashMap::new(); + headers.insert("test-header".to_string(), "test-value".to_string()); + let payload = json!({ "test": "payload" }); + + // UniversalInstrumentationStart arrives first - should buffer + let result = buffer + .pair_universal_instrumentation_start_with_request_id(request_id, &headers, &payload); + assert!( + !result, + "Should return false when Invoke hasn't happened yet" + ); + assert_eq!( + buffer.universal_instrumentation_start_events_with_id.len(), + 1, + "Should buffer the event" + ); + + // Verify buffered event has correct request_id + let buffered = buffer.take_universal_instrumentation_start_for_request_id(request_id); + assert!( + buffered.is_some(), + "Should find buffered event by request_id" + ); + assert_eq!(buffered.unwrap().request_id, request_id); + } + + #[test] + fn test_managed_instance_concurrent_invocations() { + // Test case: Multiple concurrent invocations with interleaved events + let mut buffer = ContextBuffer::with_capacity(10); + let mut headers_a = HashMap::new(); + headers_a.insert("req".to_string(), "A".to_string()); + let mut headers_b = HashMap::new(); + headers_b.insert("req".to_string(), "B".to_string()); + let payload_a = json!({ "data": "A" }); + let payload_b = json!({ "data": "B" }); + + // Scenario: A and B invocations interleaved + // 1. UniversalInstrumentationStart(A) arrives + let result_a = buffer + .pair_universal_instrumentation_start_with_request_id("req-A", &headers_a, &payload_a); + assert!(!result_a, "A should be buffered"); + + // 2. UniversalInstrumentationStart(B) arrives + let result_b = buffer + .pair_universal_instrumentation_start_with_request_id("req-B", &headers_b, &payload_b); + assert!(!result_b, "B should be buffered"); + + // 3. Invoke(B) arrives - should pair with B's buffered event + let buffered_b = buffer.take_universal_instrumentation_start_for_request_id("req-B"); + assert!(buffered_b.is_some(), "Should find B's event"); + assert_eq!(buffered_b.as_ref().unwrap().request_id, "req-B"); + assert_eq!( + buffered_b.as_ref().unwrap().headers.get("req"), + Some(&"B".to_string()) + ); + + // 4. Invoke(A) arrives - should pair with A's buffered event + let buffered_a = buffer.take_universal_instrumentation_start_for_request_id("req-A"); + assert!(buffered_a.is_some(), "Should find A's event"); + assert_eq!(buffered_a.as_ref().unwrap().request_id, "req-A"); + assert_eq!( + buffered_a.as_ref().unwrap().headers.get("req"), + Some(&"A".to_string()) + ); + + // Verify no cross-contamination occurred + assert_eq!( + buffer.universal_instrumentation_start_events_with_id.len(), + 0, + "All events should be paired" + ); + } + + #[test] + fn test_managed_instance_universal_instrumentation_end_runtime_done_first() { + // Test case: PlatformRuntimeDone arrives before UniversalInstrumentationEnd + let mut buffer = ContextBuffer::with_capacity(10); + let request_id = "req-789"; + let mut headers = HashMap::new(); + headers.insert("status".to_string(), "success".to_string()); + let payload = json!({ "response": "data" }); + + // Simulate PlatformRuntimeDone arriving first + buffer + .platform_runtime_done_events_request_ids + .push_back(request_id.to_string()); + + // UniversalInstrumentationEnd arrives - should pair immediately + let result = buffer + .pair_universal_instrumentation_end_with_request_id(request_id, &headers, &payload); + assert!( + result, + "Should return true when PlatformRuntimeDone already happened" + ); + assert_eq!( + buffer.platform_runtime_done_events_request_ids.len(), + 0, + "Should remove paired RuntimeDone event" + ); + assert_eq!( + buffer.universal_instrumentation_end_events_with_id.len(), + 0, + "Should not buffer when immediate pairing occurs" + ); + } + + #[test] + fn test_managed_instance_universal_instrumentation_end_buffering() { + // Test case: UniversalInstrumentationEnd arrives before PlatformRuntimeDone + let mut buffer = ContextBuffer::with_capacity(10); + let request_id = "req-abc"; + let mut headers = HashMap::new(); + headers.insert("status".to_string(), "success".to_string()); + let payload = json!({ "response": "data" }); + + // UniversalInstrumentationEnd arrives first - should buffer + let result = buffer + .pair_universal_instrumentation_end_with_request_id(request_id, &headers, &payload); + assert!( + !result, + "Should return false when RuntimeDone hasn't happened yet" + ); + assert_eq!( + buffer.universal_instrumentation_end_events_with_id.len(), + 1, + "Should buffer the event" + ); + + // Verify buffered event has correct request_id + let buffered = buffer.take_universal_instrumentation_end_for_request_id(request_id); + assert!( + buffered.is_some(), + "Should find buffered event by request_id" + ); + assert_eq!(buffered.unwrap().request_id, request_id); + } + + #[test] + fn test_managed_instance_no_cross_contamination() { + // Test case: Ensure request_id matching prevents cross-contamination + let mut buffer = ContextBuffer::with_capacity(10); + let mut headers_1 = HashMap::new(); + headers_1.insert("id".to_string(), "1".to_string()); + let mut headers_2 = HashMap::new(); + headers_2.insert("id".to_string(), "2".to_string()); + let payload_1 = json!({ "data": 1 }); + let payload_2 = json!({ "data": 2 }); + + // Buffer events for two different requests + buffer + .pair_universal_instrumentation_start_with_request_id("req-1", &headers_1, &payload_1); + buffer + .pair_universal_instrumentation_start_with_request_id("req-2", &headers_2, &payload_2); + + // Retrieve request-2's event - should NOT get request-1's event + let result_2 = buffer.take_universal_instrumentation_start_for_request_id("req-2"); + assert!(result_2.is_some()); + assert_eq!(result_2.as_ref().unwrap().request_id, "req-2"); + assert_eq!( + result_2.as_ref().unwrap().headers.get("id"), + Some(&"2".to_string()), + "Should get correct event for req-2" + ); + + // Request-1's event should still be buffered + let result_1 = buffer.take_universal_instrumentation_start_for_request_id("req-1"); + assert!(result_1.is_some()); + assert_eq!(result_1.as_ref().unwrap().request_id, "req-1"); + assert_eq!( + result_1.as_ref().unwrap().headers.get("id"), + Some(&"1".to_string()), + "Should get correct event for req-1" + ); + } + + #[test] + fn test_managed_instance_pop_nonexistent_request() { + // Test case: Attempting to pop an event that doesn't exist + let mut buffer = ContextBuffer::with_capacity(10); + let result = buffer.take_universal_instrumentation_start_for_request_id("nonexistent"); + assert!( + result.is_none(), + "Should return None for nonexistent request_id" + ); + + let result_end = buffer.take_universal_instrumentation_end_for_request_id("nonexistent"); + assert!( + result_end.is_none(), + "Should return None for nonexistent request_id" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ca5d4e7b2..249f6431d 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -13,12 +13,16 @@ use tracing::{debug, warn}; use crate::{ config::{self, aws::AwsConfig}, - extension::telemetry::events::{InitType, ReportMetrics, RuntimeDoneMetrics, Status}, + extension::telemetry::events::{ + InitType, ManagedInstanceReportMetrics, OnDemandReportMetrics, ReportMetrics, + RuntimeDoneMetrics, Status, + }, lifecycle::invocation::{ base64_to_string, context::{Context, ContextBuffer, ReparentingInfo}, create_empty_span, generate_span_id, get_metadata_from_value, span_inferrer::{self, SpanInferrer}, + triggers::get_default_service_name, }, metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{ @@ -39,8 +43,6 @@ use crate::{ }, }; -use crate::lifecycle::invocation::triggers::get_default_service_name; - pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_MS: u64 = 1_000; pub const S_TO_NS: f64 = 1_000_000_000.0; @@ -81,6 +83,11 @@ pub struct Processor { /// /// These tags are used to capture runtime and initialization. dynamic_tags: HashMap, + /// Tracks active concurrent invocations for monitoring enhanced metrics in Managed Instance mode. + active_invocations: usize, + /// Tracks whether if first invocation after init has been received in Managed Instance mode. + /// Used to determine if we should search for the empty context on an invocation. + awaiting_first_invocation: bool, } impl Processor { @@ -117,18 +124,41 @@ impl Processor { service, resource, dynamic_tags: HashMap::new(), + active_invocations: 0, + awaiting_first_invocation: false, } } /// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer. /// pub fn on_invoke_event(&mut self, request_id: String) { - let invocation_span = - create_empty_span(String::from("aws.lambda"), &self.resource, &self.service); - // Important! Call set_init_tags() before adding the invocation to the context buffer - self.set_init_tags(); - self.context_buffer - .start_context(&request_id, invocation_span); + // In Managed Instance mode, if awaiting the first invocation after init, find and update the empty context created on init start + if self.aws_config.is_managed_instance_mode() && self.awaiting_first_invocation { + if self + .context_buffer + .update_empty_context_request_id(&request_id) + { + debug!( + "Updated empty context from init start with request_id: {}", + request_id + ); + } else { + debug!("Expected empty context but not found, creating new context"); + let invocation_span = + create_empty_span(String::from("aws.lambda"), &self.resource, &self.service); + self.set_init_tags(); + self.context_buffer + .start_context(&request_id, invocation_span); + } + self.awaiting_first_invocation = false; + } else { + let invocation_span = + create_empty_span(String::from("aws.lambda"), &self.resource, &self.service); + // Important! Call set_init_tags() before adding the invocation to the context buffer + self.set_init_tags(); + self.context_buffer + .start_context(&request_id, invocation_span); + } let timestamp = std::time::UNIX_EPOCH .elapsed() @@ -138,8 +168,19 @@ impl Processor { .unwrap_or_default(); if self.config.lambda_proc_enhanced_metrics { - // Resume tmp, fd, and threads enhanced metrics monitoring - self.enhanced_metrics.resume_usage_metrics_monitoring(); + if self.aws_config.is_managed_instance_mode() { + // In Managed Instance mode, track concurrent invocations + self.active_invocations += 1; + + // Start monitoring on the first active invocation + if self.active_invocations == 1 { + debug!("Starting usage metrics monitoring"); + self.enhanced_metrics.resume_usage_metrics_monitoring(); + } + } else { + // In On-Demand mode, we reset metrics and resume monitoring on each invocation + self.enhanced_metrics.restart_usage_metrics_monitoring(); + } // Collect offsets for network and cpu metrics let network_offset: Option = proc::get_network_data().ok(); @@ -159,6 +200,29 @@ impl Processor { self.enhanced_metrics.increment_invocation_metric(timestamp); self.enhanced_metrics.set_invoked_received(); + // MANAGED INSTANCE MODE: Check for buffered UniversalInstrumentationStart with request_id + if self.aws_config.is_managed_instance_mode() { + if let Some(buffered_event) = self + .context_buffer + .take_universal_instrumentation_start_for_request_id(&request_id) + { + debug!( + "Managed Instance: Found buffered UniversalInstrumentationStart for request_id: {}", + request_id + ); + // Infer span + self.inferrer + .infer_span(&buffered_event.payload_value, &self.aws_config); + self.process_on_universal_instrumentation_start( + request_id, + buffered_event.headers, + buffered_event.payload_value, + ); + } + return; + } + + // ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged) // If `UniversalInstrumentationStart` event happened first, process it if let Some((headers, payload_value)) = self.context_buffer.pair_invoke_event(&request_id) { // Infer span @@ -222,8 +286,21 @@ impl Processor { .try_into() .unwrap_or_default(); - // Get the closest context - let Some(context) = self.context_buffer.get_closest_mut(start_time) else { + // In Managed Instance mode, create a context with empty request_id which will be updated on the first platform start received. + // In On-Demand mode, InitStart is received after the Invoke event, so we get the closest context by timestamp as it does not have a request_id. + let context = if self.aws_config.is_managed_instance_mode() { + let invocation_span = + create_empty_span(String::from("aws.lambda"), &self.resource, &self.service); + self.set_init_tags(); + self.context_buffer.start_context("", invocation_span); + self.awaiting_first_invocation = true; + + self.context_buffer.get_mut(&String::new()) + } else { + self.context_buffer.get_closest_mut(start_time) + }; + + let Some(context) = context else { debug!("Cannot process on platform init start, no invocation context found"); return; }; @@ -251,11 +328,20 @@ impl Processor { self.enhanced_metrics .set_init_duration_metric(init_type, duration_ms, timestamp); - let Some(context) = self.context_buffer.get_closest_mut(timestamp) else { + // In Managed Instance mode, find the context with empty request_id + // In On-Demand mode, find the closest context by timestamp since we do not have the request_id + let context = if self.aws_config.is_managed_instance_mode() { + self.context_buffer.get_mut(&String::new()) + } else { + self.context_buffer.get_closest_mut(timestamp) + }; + + let Some(context) = context else { debug!("Cannot process on platform init report, no invocation context found"); return; }; + // Add duration to cold start span if let Some(cold_start_span) = &mut context.cold_start_span { // `round` is intentionally meant to be a whole integer cold_start_span.duration = (duration_ms * MS_TO_NS) as i64; @@ -312,6 +398,10 @@ impl Processor { /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. /// pub fn on_platform_start(&mut self, request_id: String, time: DateTime) { + if self.aws_config.is_managed_instance_mode() { + self.on_invoke_event(request_id.clone()); + } + let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) .expect("time went backwards") @@ -321,6 +411,11 @@ impl Processor { self.context_buffer.add_start_time(&request_id, start_time); } + #[must_use] + pub fn is_managed_instance_mode(&self) -> bool { + self.aws_config.is_managed_instance_mode() + } + #[allow(clippy::too_many_arguments)] #[allow(clippy::cast_possible_truncation)] pub async fn on_platform_runtime_done( @@ -354,19 +449,40 @@ impl Processor { } } - // Set tmp, fd, and threads enhanced metrics - self.enhanced_metrics.set_max_enhanced_metrics(); - self.enhanced_metrics.set_usage_enhanced_metrics(); // sets use metric values and pauses monitoring task + // In On-Demand mode, pause monitoring between invocations and emit the metrics on each invocation + if !self.aws_config.is_managed_instance_mode() { + self.enhanced_metrics.set_max_enhanced_metrics(); + self.enhanced_metrics.set_usage_enhanced_metrics(); + } self.context_buffer .add_runtime_duration(request_id, metrics.duration_ms); - // If `UniversalInstrumentationEnd` event happened first, process it first - if let Some((headers, payload)) = self - .context_buffer - .pair_platform_runtime_done_event(request_id) - { - self.process_on_universal_instrumentation_end(request_id.clone(), headers, payload); + // MANAGED INSTANCE MODE: Check for buffered UniversalInstrumentationEnd with request_id + if self.aws_config.is_managed_instance_mode() { + if let Some(buffered_event) = self + .context_buffer + .take_universal_instrumentation_end_for_request_id(request_id) + { + debug!( + "Managed Instance: Found buffered UniversalInstrumentationEnd for request_id: {}", + request_id + ); + self.process_on_universal_instrumentation_end( + request_id.clone(), + buffered_event.headers, + buffered_event.payload_value, + ); + } + } else { + // ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged) + // If `UniversalInstrumentationEnd` event happened first, process it first + if let Some((headers, payload)) = self + .context_buffer + .pair_platform_runtime_done_event(request_id) + { + self.process_on_universal_instrumentation_end(request_id.clone(), headers, payload); + } } self.process_on_platform_runtime_done(request_id, status, tags_provider, trace_sender) @@ -590,16 +706,128 @@ impl Processor { /// If the `request_id` is not found in the context buffer, return `None`. /// If the `runtime_duration_ms` hasn't been seen, return `None`. /// - pub fn on_platform_report( + #[allow(clippy::too_many_arguments)] + pub async fn on_platform_report( &mut self, request_id: &String, metrics: ReportMetrics, timestamp: i64, + status: Status, + error_type: Option, + spans: Option>, + tags_provider: Arc, + trace_sender: Arc, ) { // Set the report log metrics self.enhanced_metrics .set_report_log_metrics(&metrics, timestamp); + match metrics { + ReportMetrics::ManagedInstance(metric) => { + self.handle_managed_instance_report( + request_id, + metric, + timestamp, + status, + error_type, + spans, + tags_provider, + trace_sender, + ) + .await; + } + ReportMetrics::OnDemand(metrics) => { + self.handle_ondemand_report(request_id, metrics, timestamp); + } + } + + // Set Network and CPU time metrics + if let Some(context) = self.context_buffer.get(request_id) { + if let Some(offsets) = &context.enhanced_metric_data { + self.enhanced_metrics + .set_network_enhanced_metrics(offsets.network_offset); + self.enhanced_metrics + .set_cpu_time_enhanced_metrics(offsets.cpu_offset.clone()); + } + } + } + + /// Handles Managed Instance mode platform report processing. + /// + /// In Managed Instance mode, platform.runtimeDone events are not sent, so this function + /// synthesizes a `RuntimeDone` event from the platform.report metrics. + #[allow(clippy::too_many_arguments)] + async fn handle_managed_instance_report( + &mut self, + request_id: &String, + metric: ManagedInstanceReportMetrics, + timestamp: i64, + status: Status, + error_type: Option, + spans: Option>, + tags_provider: Arc, + trace_sender: Arc, + ) { + // Managed Instance mode doesn't have platform.runtimeDone event, so we synthesize it from platform.report + // Try to get duration from responseLatency span, otherwise fall back to metric.duration_ms + let duration_ms = spans + .as_ref() + .and_then(|span_vec| { + span_vec + .iter() + .find(|span| span.name == "responseLatency") + .map(|span| span.duration_ms) + }) + .unwrap_or(metric.duration_ms); + + let runtime_done_metrics = RuntimeDoneMetrics { + duration_ms, + produced_bytes: None, + }; + + // Track concurrent invocations - decrement after handling report + if self.active_invocations > 0 { + self.active_invocations -= 1; + } else { + debug!("Active invocations already at 0, not updating active invocations"); + } + + // Only pause monitoring when there are no active invocations + if self.active_invocations == 0 { + debug!("No active invocations, pausing usage metrics monitoring"); + self.enhanced_metrics.pause_usage_metrics_monitoring(); + } + + // Only process if we have context for this request + if self.context_buffer.get(request_id).is_some() { + self.on_platform_runtime_done( + request_id, + runtime_done_metrics, + status, + error_type, + tags_provider, + trace_sender, + timestamp, + ) + .await; + } else { + debug!( + "Received platform report for unknown request_id: {}", + request_id + ); + } + } + + /// Handles `OnDemand` mode platform report processing. + /// + /// Processes OnDemand-specific metrics including OOM detection for provided.al runtimes + /// and post-runtime duration calculation. + fn handle_ondemand_report( + &mut self, + request_id: &String, + metrics: OnDemandReportMetrics, + timestamp: i64, + ) { // For provided.al runtimes, if the last invocation hit the memory limit, increment the OOM metric. // We do this for provided.al runtimes because we didn't find another way to detect this under provided.al. // We don't do this for other runtimes to avoid double counting. @@ -614,22 +842,13 @@ impl Processor { } } + // Calculate and set post-runtime duration if context is available if let Some(context) = self.context_buffer.get(request_id) { if context.runtime_duration_ms != 0.0 { let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms; - - // Set the post runtime duration metric self.enhanced_metrics .set_post_runtime_duration_metric(post_runtime_duration_ms, timestamp); } - - // Set Network and CPU time metrics - if let Some(offsets) = context.enhanced_metric_data.clone() { - self.enhanced_metrics - .set_network_enhanced_metrics(offsets.network_offset); - self.enhanced_metrics - .set_cpu_time_enhanced_metrics(offsets.cpu_offset); - } } } @@ -642,6 +861,12 @@ impl Processor { .set_shutdown_metric(i64::try_from(now).expect("can't convert now to i64")); self.enhanced_metrics .set_unused_init_metric(i64::try_from(now).expect("can't convert now to i64")); + + // In managed instance mode, emit sandbox-level usage metrics at shutdown + if self.aws_config.is_managed_instance_mode() { + self.enhanced_metrics.set_max_enhanced_metrics(); // Emit system limits (fd_max, threads_max, tmp_max) + self.enhanced_metrics.set_usage_enhanced_metrics(); // Emit sandbox-wide peak usage + } } /// If this method is called, it means that we are operating in a Universally Instrumented @@ -651,12 +876,46 @@ impl Processor { &mut self, headers: HashMap, payload_value: Value, + request_id: Option, ) { self.tracer_detected = true; self.inferrer.infer_span(&payload_value, &self.aws_config); - // If `Invoke` event happened first, process right away + // MANAGED INSTANCE MODE: Use request ID-based pairing for concurrent invocations + if self.aws_config.is_managed_instance_mode() { + if let Some(req_id) = request_id { + debug!( + "Managed Instance: Processing UniversalInstrumentationStart for request_id: {}", + req_id + ); + if self + .context_buffer + .pair_universal_instrumentation_start_with_request_id( + &req_id, + &headers, + &payload_value, + ) + { + // Invoke event already happened, process immediately + self.process_on_universal_instrumentation_start(req_id, headers, payload_value); + } else { + // Buffered for later pairing when Invoke event arrives + debug!( + "Managed Instance: Buffered UniversalInstrumentationStart for request_id: {}", + req_id + ); + } + return; + } + // Missing request_id in managed instance mode - log warning and fall back to FIFO + warn!( + "Managed Instance: UniversalInstrumentationStart missing request_id header. \ + Falling back to FIFO pairing (may cause incorrect pairing with concurrent invocations)" + ); + } + + // ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged) if let Some(request_id) = self .context_buffer .pair_universal_instrumentation_start_event(&headers, &payload_value) @@ -824,7 +1083,42 @@ impl Processor { &mut self, headers: HashMap, payload_value: Value, + request_id: Option, ) { + // MANAGED INSTANCE MODE: Use request ID-based pairing for concurrent invocations + if self.aws_config.is_managed_instance_mode() { + if let Some(req_id) = request_id { + debug!( + "Managed Instance: Processing UniversalInstrumentationEnd for request_id: {}", + req_id + ); + if self + .context_buffer + .pair_universal_instrumentation_end_with_request_id( + &req_id, + &headers, + &payload_value, + ) + { + // PlatformRuntimeDone already happened, process immediately + self.process_on_universal_instrumentation_end(req_id, headers, payload_value); + } else { + // Buffered for later pairing when PlatformRuntimeDone arrives + debug!( + "Managed Instance: Buffered UniversalInstrumentationEnd for request_id: {}", + req_id + ); + } + return; + } + // Missing request_id in managed instance mode - log warning and fall back to FIFO + warn!( + "Managed Instance: UniversalInstrumentationEnd missing request_id header. \ + Falling back to FIFO pairing (may cause incorrect pairing with concurrent invocations)" + ); + } + + // ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged) // If `PlatformRuntimeDone` event happened first, process if let Some(request_id) = self .context_buffer @@ -1018,6 +1312,10 @@ impl Processor { mod tests { use super::*; use crate::LAMBDA_RUNTIME_SLUG; + use crate::extension::telemetry::events::ManagedInstanceReportMetrics; + use crate::traces::stats_concentrator_service::StatsConcentratorService; + use crate::traces::stats_generator::StatsGenerator; + use crate::traces::trace_processor; use base64::{Engine, engine::general_purpose::STANDARD}; use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::metric::EMPTY_TAGS; @@ -1031,6 +1329,7 @@ mod tests { sandbox_init_time: Instant::now(), runtime_api: "***".into(), exec_wrapper: None, + initialization_type: "on-demand".into(), }); let config = Arc::new(config::Config { @@ -1271,6 +1570,171 @@ mod tests { ); } + macro_rules! platform_report_managed_instance_tests { + ($($name:ident: $value:expr,)*) => { + $( + #[tokio::test] + async fn $name() { + use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig; + + let ( + request_id, + setup_context, + duration_ms, + status, + error_type, + should_have_context_after, + ): (&str, bool, f64, Status, Option, bool) = $value; + + let mut processor = setup(); + + // Setup context if needed + if setup_context { + processor.on_invoke_event(request_id.to_string()); + let start_time = chrono::Utc::now(); + processor.on_platform_start(request_id.to_string(), start_time); + } + + let metrics = ReportMetrics::ManagedInstance(ManagedInstanceReportMetrics { + duration_ms, + }); + + // Create tags_provider + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + // Create stats concentrator for test + let (stats_concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(stats_concentrator_service.run()); + + // Create trace sender + let trace_sender = Arc::new(SendingTraceProcessor { + appsec: None, + processor: Arc::new(trace_processor::ServerlessTraceProcessor { + obfuscation_config: Arc::new(ObfuscationConfig::new().expect("Failed to create ObfuscationConfig")), + }), + trace_tx: tokio::sync::mpsc::channel(1).0, + stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), + }); + + // Call on_platform_report + let request_id_string = request_id.to_string(); + processor.on_platform_report( + &request_id_string, + metrics, + chrono::Utc::now().timestamp(), + status, + error_type, + None, // spans + tags_provider, + trace_sender, + ).await; + + // Verify context state + let request_id_string_for_get = request_id.to_string(); + assert_eq!( + processor.context_buffer.get(&request_id_string_for_get).is_some(), + should_have_context_after, + "Context existence mismatch for request_id: {}", + request_id + ); + } + )* + } + } + + platform_report_managed_instance_tests! { + // (request_id, setup_context, duration_ms, status, error_type, should_have_context_after) + test_on_platform_report_managed_instance_mode_with_valid_context: ( + "test-request-id", + true, // setup context + 123.45, + Status::Success, + None, + true, // context should still exist + ), + + test_on_platform_report_managed_instance_mode_without_context: ( + "unknown-request-id", + false, // no context setup + 123.45, + Status::Success, + None, + false, // context should not exist + ), + + test_on_platform_report_managed_instance_mode_with_error_status: ( + "test-request-id-error", + true, // setup context + 200.0, + Status::Error, + Some("RuntimeError".to_string()), + true, // context should still exist + ), + + test_on_platform_report_managed_instance_mode_with_timeout: ( + "test-request-id-timeout", + true, // setup context + 30000.0, + Status::Timeout, + None, + true, // context should still exist + ), + } + + #[tokio::test] + async fn test_is_managed_instance_mode_returns_true() { + use crate::config::aws::LAMBDA_MANAGED_INSTANCES_INIT_TYPE; + + let aws_config = Arc::new(AwsConfig { + region: "us-east-1".into(), + aws_lwa_proxy_lambda_runtime_api: Some("***".into()), + function_name: "test-function".into(), + sandbox_init_time: Instant::now(), + runtime_api: "***".into(), + exec_wrapper: None, + initialization_type: LAMBDA_MANAGED_INSTANCES_INIT_TYPE.into(), // Managed Instance mode + }); + + let config = Arc::new(config::Config::default()); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::new(), + )); + + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + tokio::spawn(service.run()); + + let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config))); + + let processor = Processor::new(tags_provider, config, aws_config, handle, propagator); + + assert!( + processor.is_managed_instance_mode(), + "Should be in Managed Instance mode" + ); + } + + #[tokio::test] + async fn test_is_managed_instance_mode_returns_false() { + let processor = setup(); // Uses "on-demand" initialization_type + assert!( + !processor.is_managed_instance_mode(), + "Should not be in managed instance mode" + ); + } + #[tokio::test] async fn test_on_platform_restore_start_creates_snapstart_span() { let mut processor = setup(); diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 722bb92fd..2e3975e7c 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -10,7 +10,9 @@ use tracing::{debug, error}; use crate::{ config::{self, aws::AwsConfig}, - extension::telemetry::events::{InitType, ReportMetrics, RuntimeDoneMetrics, Status}, + extension::telemetry::events::{ + InitType, ReportMetrics, RuntimeDoneMetrics, Status, TelemetrySpan, + }, lifecycle::invocation::{ context::{Context, ReparentingInfo}, processor::Processor, @@ -69,14 +71,22 @@ pub enum ProcessorCommand { request_id: String, metrics: ReportMetrics, timestamp: i64, + status: Status, + error_type: Option, + spans: Option>, + tags_provider: Arc, + trace_sender: Arc, + response: oneshot::Sender<()>, }, UniversalInstrumentationStart { headers: HashMap, payload_value: Value, + request_id: Option, }, UniversalInstrumentationEnd { headers: HashMap, payload_value: Value, + request_id: Option, }, AddReparenting { request_id: String, @@ -213,30 +223,49 @@ impl InvocationProcessorHandle { Ok(()) } + #[allow(clippy::too_many_arguments)] pub async fn on_platform_report( &self, - request_id: String, + request_id: &str, metrics: ReportMetrics, timestamp: i64, - ) -> Result<(), mpsc::error::SendError> { + status: Status, + error_type: &Option, + spans: &Option>, + tags_provider: Arc, + trace_sender: Arc, + ) -> Result<(), ProcessorError> { + let (tx, rx) = oneshot::channel(); self.sender .send(ProcessorCommand::PlatformReport { - request_id, + request_id: request_id.to_string(), metrics, timestamp, + status, + error_type: error_type.clone(), + spans: spans.clone(), + tags_provider, + trace_sender, + response: tx, }) .await + .map_err(|e| ProcessorError::ChannelSend(e.to_string()))?; + rx.await + .map_err(|e| ProcessorError::ChannelReceive(e.to_string()))?; + Ok(()) } pub async fn on_universal_instrumentation_start( &self, headers: HashMap, payload_value: Value, + request_id: Option, ) -> Result<(), mpsc::error::SendError> { self.sender .send(ProcessorCommand::UniversalInstrumentationStart { headers, payload_value, + request_id, }) .await } @@ -245,11 +274,13 @@ impl InvocationProcessorHandle { &self, headers: HashMap, payload_value: Value, + request_id: Option, ) -> Result<(), mpsc::error::SendError> { self.sender .send(ProcessorCommand::UniversalInstrumentationEnd { headers, payload_value, + request_id, }) .await } @@ -474,23 +505,54 @@ impl InvocationProcessorService { request_id, metrics, timestamp, + status, + error_type, + spans, + tags_provider, + trace_sender, + response, } => { self.processor - .on_platform_report(&request_id, metrics, timestamp); + .on_platform_report( + &request_id, + metrics, + timestamp, + status, + error_type, + spans, + tags_provider, + trace_sender, + ) + .await; + + // The necessity of having response.send(): + // Without it, the caller at line 187-188 (rx.await) would block forever waiting for a response + // - The async task would never complete + // - The entire extension could hang during shutdown + // this change also mirrors the behavior of handling PlatformRuntimeDone in OD mode + let _ = response.send(()); } ProcessorCommand::UniversalInstrumentationStart { headers, payload_value, + request_id, } => { - self.processor - .on_universal_instrumentation_start(headers, payload_value); + self.processor.on_universal_instrumentation_start( + headers, + payload_value, + request_id, + ); } ProcessorCommand::UniversalInstrumentationEnd { headers, payload_value, + request_id, } => { - self.processor - .on_universal_instrumentation_end(headers, payload_value); + self.processor.on_universal_instrumentation_end( + headers, + payload_value, + request_id, + ); } ProcessorCommand::AddReparenting { request_id, diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index f5b7adff0..d93f52d1d 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -542,6 +542,7 @@ mod tests { function_name: String::new(), sandbox_init_time: Instant::now(), exec_wrapper: None, + initialization_type: "on-demand".into(), }); inferrer.infer_span(&payload, &aws_config); diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index 54687af2e..f6b32fb68 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -38,6 +38,13 @@ const START_INVOCATION_PATH: &str = "/lambda/start-invocation"; const END_INVOCATION_PATH: &str = "/lambda/end-invocation"; const AGENT_PORT: usize = 8124; +/// Extracts the AWS Lambda request ID from the LWA proxy header. +fn extract_request_id_from_headers(headers: &HashMap) -> Option { + headers + .get("lambda-runtime-aws-request-id") + .map(String::to_string) +} + pub struct Listener { propagator: Arc, pub invocation_processor_handle: InvocationProcessorHandle, @@ -185,10 +192,14 @@ impl Listener { payload_value: Value, invocation_processor_handle: InvocationProcessorHandle, ) { - debug!("Received start invocation request"); + debug!( + "Received start invocation request from headers:{headers:?}, payload_value:{payload_value:?}" + ); + + let request_id = extract_request_id_from_headers(&headers); if let Err(e) = invocation_processor_handle - .on_universal_instrumentation_start(headers, payload_value) + .on_universal_instrumentation_start(headers, payload_value, request_id) .await { error!( @@ -240,13 +251,16 @@ impl Listener { body: Bytes, invocation_processor_handle: InvocationProcessorHandle, ) { - debug!("Received end invocation request"); - let headers = headers_to_map(headers); let payload_value = serde_json::from_slice::(&body).unwrap_or_else(|_| json!({})); + debug!( + "Received end invocation request from headers:{headers:?}, payload_value:{payload_value:?}" + ); + let request_id = extract_request_id_from_headers(&headers); + if let Err(e) = invocation_processor_handle - .on_universal_instrumentation_end(headers, payload_value) + .on_universal_instrumentation_end(headers, payload_value, request_id) .await { error!( @@ -256,3 +270,52 @@ impl Listener { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_request_id_from_header() { + let headers = HashMap::from([( + "lambda-runtime-aws-request-id".to_string(), + "req-from-header".to_string(), + )]); + + let result = extract_request_id_from_headers(&headers); + + assert_eq!( + result, + Some("req-from-header".to_string()), + "Should extract request_id from lambda-runtime-aws-request-id header" + ); + } + + #[test] + fn test_extract_request_id_none_when_missing() { + let headers = HashMap::new(); + + let result = extract_request_id_from_headers(&headers); + + assert_eq!( + result, None, + "Should return None when request_id header is not present" + ); + } + + #[test] + fn test_extract_request_id_lwa_proxy_header() { + let headers = HashMap::from([( + "lambda-runtime-aws-request-id".to_string(), + "lwa-proxy-request-id".to_string(), + )]); + + let result = extract_request_id_from_headers(&headers); + + assert_eq!( + result, + Some("lwa-proxy-request-id".to_string()), + "Should extract request_id from LWA proxy header" + ); + } +} diff --git a/bottlecap/src/logs/agent.rs b/bottlecap/src/logs/agent.rs index f18a029ca..432d030c3 100644 --- a/bottlecap/src/logs/agent.rs +++ b/bottlecap/src/logs/agent.rs @@ -24,12 +24,14 @@ impl LogsAgent { datadog_config: Arc, event_bus: Sender, aggregator_handle: AggregatorHandle, + is_managed_instance_mode: bool, ) -> (Self, Sender) { let processor = LogsProcessor::new( Arc::clone(&datadog_config), tags_provider, event_bus, LAMBDA_RUNTIME_SLUG.to_string(), + is_managed_instance_mode, ); let (tx, rx) = mpsc::channel::(1000); diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index ec29bcdcb..285719d50 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -7,6 +7,7 @@ use tracing::{debug, error}; use crate::LAMBDA_RUNTIME_SLUG; use crate::config; use crate::event_bus::Event; +use crate::extension::telemetry::events::ReportMetrics; use crate::extension::telemetry::events::{Status, TelemetryEvent, TelemetryRecord}; use crate::lifecycle::invocation::context::Context as InvocationContext; use crate::logs::aggregator_service::AggregatorHandle; @@ -33,6 +34,8 @@ pub struct LambdaProcessor { event_bus: Sender, // Logs enabled logs_enabled: bool, + // Managed Instance mode + is_managed_instance_mode: bool, } const OOM_ERRORS: [&str; 7] = [ @@ -59,6 +62,7 @@ impl LambdaProcessor { tags_provider: Arc, datadog_config: Arc, event_bus: Sender, + is_managed_instance_mode: bool, ) -> Self { let service = datadog_config.service.clone().unwrap_or_default(); let tags = tags_provider.get_tags_string(); @@ -77,6 +81,7 @@ impl LambdaProcessor { orphan_logs: Vec::new(), ready_logs: Vec::new(), event_bus, + is_managed_instance_mode, } } @@ -187,40 +192,89 @@ impl LambdaProcessor { Some(result_status), )) }, - TelemetryRecord::PlatformReport { request_id, metrics, .. } => { // TODO: check what to do with rest of the fields + TelemetryRecord::PlatformReport { request_id, metrics, status, error_type, .. } => { if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await { error!("Failed to send PlatformReport to the main event bus: {}", e); } + match metrics { + ReportMetrics::ManagedInstance(managed_instance_metrics) => { + let (result_status, message) = match status { + Status::Timeout => ( + "error", + format!( + "REPORT RequestId: {} Runtime Duration: {:.2} ms Task timed out after {:.2} seconds", + request_id, + managed_instance_metrics.duration_ms, + managed_instance_metrics.duration_ms / 1000.0 + ) + ), + Status::Error => { + let error_detail = error_type + .as_ref() + .map_or_else(|| " Task failed with an unknown error".to_string(), |e| format!(" Task failed: {e}")); + ( + "error", + format!( + "REPORT RequestId: {} Runtime Duration: {:.2} ms{}", + request_id, + managed_instance_metrics.duration_ms, + error_detail + ) + ) + } + _ => ( + "info", + format!( + "REPORT RequestId: {} Runtime Duration: {:.2} ms", + request_id, + managed_instance_metrics.duration_ms + ) + ) + }; + + self.invocation_context.runtime_duration_ms = managed_instance_metrics.duration_ms; + // Remove the `request_id` since no more orphan logs will be processed with this one + self.invocation_context.request_id = String::new(); + + Ok(Message::new( + message, + Some(request_id), + self.function_arn.clone(), + event.time.timestamp_millis(), + Some(result_status.to_string()), + )) + } + ReportMetrics::OnDemand(metrics) => { + let mut post_runtime_duration_ms = 0.0; + // Calculate `post_runtime_duration_ms` if we've seen a `runtime_duration_ms`. + if self.invocation_context.runtime_duration_ms > 0.0 { + post_runtime_duration_ms = metrics.duration_ms - self.invocation_context.runtime_duration_ms; + } - let mut post_runtime_duration_ms = 0.0; - // Calculate `post_runtime_duration_ms` if we've seen a `runtime_duration_ms`. - if self.invocation_context.runtime_duration_ms > 0.0 { - post_runtime_duration_ms = metrics.duration_ms - self.invocation_context.runtime_duration_ms; - } + let mut message = format!( + "REPORT RequestId: {} Duration: {:.2} ms Runtime Duration: {:.2} ms Post Runtime Duration: {:.2} ms Billed Duration: {:.2} ms Memory Size: {} MB Max Memory Used: {} MB", + request_id, + metrics.duration_ms, + self.invocation_context.runtime_duration_ms, + post_runtime_duration_ms, + metrics.billed_duration_ms, + metrics.memory_size_mb, + metrics.max_memory_used_mb, + ); + + if let Some(init_duration_ms) = metrics.init_duration_ms { + message = format!("{message} Init Duration: {init_duration_ms:.2} ms"); + } - let mut message = format!( - "REPORT RequestId: {} Duration: {:.2} ms Runtime Duration: {:.2} ms Post Runtime Duration: {:.2} ms Billed Duration: {:.2} ms Memory Size: {} MB Max Memory Used: {} MB", - request_id, - metrics.duration_ms, - self.invocation_context.runtime_duration_ms, - post_runtime_duration_ms, - metrics.billed_duration_ms, - metrics.memory_size_mb, - metrics.max_memory_used_mb, - ); - - let init_duration_ms = metrics.init_duration_ms; - if let Some(init_duration_ms) = init_duration_ms { - message = format!("{message} Init Duration: {init_duration_ms:.2} ms"); + Ok(Message::new( + message, + Some(request_id), + self.function_arn.clone(), + event.time.timestamp_millis(), + None, + )) + } } - - Ok(Message::new( - message, - Some(request_id), - self.function_arn.clone(), - event.time.timestamp_millis(), - None, - )) }, TelemetryRecord::PlatformRestoreStart { .. } => { if let Err(e) = self.event_bus.send(Event::Telemetry(event)).await { @@ -247,7 +301,10 @@ impl LambdaProcessor { lambda_message.lambda.request_id = match lambda_message.lambda.request_id { Some(request_id) => Some(request_id.clone()), None => { - if self.invocation_context.request_id.is_empty() { + // If there is no request_id available in the current invocation context, + // then set to None, same goes if we are in a Managed Instance – as concurrent + // requests doesn't allow us to infer which invocation the logs belong to. + if self.invocation_context.request_id.is_empty() || self.is_managed_instance_mode { None } else { Some(self.invocation_context.request_id.clone()) @@ -290,10 +347,14 @@ impl LambdaProcessor { } }; - if log.message.lambda.request_id.is_some() { + if log.message.lambda.request_id.is_some() || self.is_managed_instance_mode { + // In On-Demand mode, ship logs with request_id. + // In Managed Instance mode, ship logs without request_id immediately as well. + // These are inter-invocation/sandbox logs that should be aggregated without + // waiting to be attached to the next invocation. Ok(log) } else { - // No request_id available, queue as orphan log + // In On-Demand mode, if no request_id is available, queue as orphan log self.orphan_logs.push(log); Err("No request_id available, queueing for later".into()) } @@ -382,7 +443,8 @@ mod tests { use std::sync::Arc; use crate::extension::telemetry::events::{ - InitPhase, InitType, ReportMetrics, RuntimeDoneMetrics, Status, + InitPhase, InitType, ManagedInstanceReportMetrics, OnDemandReportMetrics, ReportMetrics, + RuntimeDoneMetrics, Status, }; use crate::logs::aggregator_service::AggregatorService; use crate::logs::lambda::Lambda; @@ -415,6 +477,7 @@ mod tests { tags, ..config::Config::default()}), tx.clone(), + false, // On-Demand mode ); let result = processor.get_message(input.clone()).await.unwrap(); @@ -561,14 +624,15 @@ mod tests { error_type: None, status: Status::Success, request_id: "test-request-id".to_string(), - metrics: ReportMetrics { + metrics: ReportMetrics::OnDemand(OnDemandReportMetrics { duration_ms: 100.0, billed_duration_ms: 128, memory_size_mb: 256, max_memory_used_mb: 64, init_duration_ms: Some(50.0), restore_duration_ms: None - } + }), + spans: None, } }, Message { @@ -581,6 +645,106 @@ mod tests { status: "info".to_string(), }, ), + + // platform report - Managed Instance mode success + platform_report_managed_instance_success: ( + &TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 2, 30, 27).unwrap(), + record: TelemetryRecord::PlatformReport { + request_id: "test-request-id".to_string(), + metrics: ReportMetrics::ManagedInstance(ManagedInstanceReportMetrics { + duration_ms: 123.45, + }), + status: Status::Success, + error_type: None, + spans: None, + } + }, + Message { + message: "REPORT RequestId: test-request-id Runtime Duration: 123.45 ms".to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_058_627_000, + status: "info".to_string(), + }, + ), + + // platform report - managed instance mode error with error_type + platform_report_managed_instance_error: ( + &TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 2, 30, 27).unwrap(), + record: TelemetryRecord::PlatformReport { + request_id: "test-request-id".to_string(), + metrics: ReportMetrics::ManagedInstance(ManagedInstanceReportMetrics { + duration_ms: 200.0, + }), + status: Status::Error, + error_type: Some("RuntimeError".to_string()), + spans: None, + } + }, + Message { + message: "REPORT RequestId: test-request-id Runtime Duration: 200.00 ms Task failed: RuntimeError".to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_058_627_000, + status: "error".to_string(), + }, + ), + + // platform report - managed instance mode timeout + platform_report_managed_instance_timeout: ( + &TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 2, 30, 27).unwrap(), + record: TelemetryRecord::PlatformReport { + request_id: "test-request-id".to_string(), + metrics: ReportMetrics::ManagedInstance(ManagedInstanceReportMetrics { + duration_ms: 30000.0, + }), + status: Status::Timeout, + error_type: None, + spans: None, + } + }, + Message { + message: "REPORT RequestId: test-request-id Runtime Duration: 30000.00 ms Task timed out after 30.00 seconds".to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_058_627_000, + status: "error".to_string(), + }, + ), + + // platform report - managed instance mode error without error_type + platform_report_managed_instance_error_no_type: ( + &TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 2, 30, 27).unwrap(), + record: TelemetryRecord::PlatformReport { + request_id: "test-request-id".to_string(), + metrics: ReportMetrics::ManagedInstance(ManagedInstanceReportMetrics { + duration_ms: 150.0, + }), + status: Status::Error, + error_type: None, + spans: None, + } + }, + Message { + message: "REPORT RequestId: test-request-id Runtime Duration: 150.00 ms Task failed with an unknown error".to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_058_627_000, + status: "error".to_string(), + }, + ), } #[tokio::test] @@ -597,7 +761,8 @@ mod tests { let (tx, _) = tokio::sync::mpsc::channel(2); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -630,7 +795,8 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -676,7 +842,8 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -709,7 +876,8 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -736,6 +904,45 @@ mod tests { ); } + #[tokio::test] + async fn test_get_intake_log_managed_instance_mode() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + + // Set is_managed_instance_mode to true + let mut processor = + LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone(), true); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::String("test-function".to_string())), + }; + + let lambda_message = processor.get_message(event.clone()).await.unwrap(); + assert_eq!(lambda_message.lambda.request_id, None); + + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + assert_eq!(intake_log.message.lambda.request_id, None); + assert_eq!(processor.orphan_logs.len(), 0); + + assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); + assert_eq!(intake_log.hostname, "test-arn".to_string()); + assert_eq!(intake_log.service, "test-service".to_string()); + assert_eq!(intake_log.message.message, "test-function".to_string()); + assert_eq!(intake_log.tags, tags_provider.get_tags_string()); + } + // process #[tokio::test] async fn test_process() { @@ -758,8 +965,12 @@ mod tests { aggregator_service.run().await; }); - let mut processor = - LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); + let mut processor = LambdaProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + false, + ); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -822,8 +1033,12 @@ mod tests { aggregator_service.run().await; }); - let mut processor = - LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); + let mut processor = LambdaProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + false, + ); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -867,7 +1082,8 @@ mod tests { aggregator_service.run().await; }); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let mut processor = + LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), false); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -909,8 +1125,12 @@ mod tests { aggregator_service.run().await; }); - let mut processor = - LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); + let mut processor = LambdaProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + false, + ); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -1000,8 +1220,12 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = - LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); + let mut processor = LambdaProcessor::new( + tags_provider.clone(), + Arc::clone(&config), + tx.clone(), + false, + ); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), record: TelemetryRecord::PlatformStart { @@ -1058,8 +1282,12 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = - LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); + let mut processor = LambdaProcessor::new( + tags_provider.clone(), + Arc::clone(&config), + tx.clone(), + false, + ); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), record: TelemetryRecord::PlatformStart { @@ -1131,8 +1359,12 @@ mod tests { aggregator_service.run().await; }); - let mut processor = - LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); + let mut processor = LambdaProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + false, + ); // First, send an extension log (orphan) that doesn't have a request_id let extension_event = TelemetryEvent { @@ -1200,4 +1432,67 @@ mod tests { .await .expect("Aggregator service task failed"); } + + #[tokio::test] + async fn test_orphan_logs_no_request_id_in_managed_instance() { + // This test verifies that in managed instance mode, logs without a request_id + // are sent immediately without being queued as orphans, and they never get + // a request_id attached even when one becomes available in the invocation context. + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + + // Create processor in managed instance mode (is_managed_instance_mode = true) + let mut processor = LambdaProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + true, // Managed Instance mode + ); + + // Send a function log without a request_id (inter-invocation log) + let function_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::String( + "Inter-invocation log without request_id".to_string(), + )), + }; + let log1 = processor.make_log(function_event).await.unwrap(); + assert_eq!(log1.message.lambda.request_id, None); + assert_eq!(processor.orphan_logs.len(), 0); + + // Now send a PlatformStart event with a request_id to set the context + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 49).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + processor.make_log(start_event).await.unwrap(); + assert_eq!(processor.invocation_context.request_id, "test-request-id"); + + // Send another function log without explicit request_id + let another_function_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 50).unwrap(), + record: TelemetryRecord::Function(Value::String( + "Another log after request_id available".to_string(), + )), + }; + let log2 = processor.make_log(another_function_event).await.unwrap(); + + // In managed instance mode, logs should not inherit request_id from context + assert_eq!(log2.message.lambda.request_id, None); + assert_eq!(processor.orphan_logs.len(), 0); + } } diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index cc3d220af..cff6df5f0 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -18,10 +18,16 @@ impl LogsProcessor { tags_provider: Arc, event_bus: Sender, runtime: String, + is_managed_instance_mode: bool, ) -> Self { match runtime.as_str() { LAMBDA_RUNTIME_SLUG => { - let lambda_processor = LambdaProcessor::new(tags_provider, config, event_bus); + let lambda_processor = LambdaProcessor::new( + tags_provider, + config, + event_bus, + is_managed_instance_mode, + ); LogsProcessor::Lambda(lambda_processor) } _ => panic!("Unsupported runtime: {runtime}"), diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index b300acd85..4e41e4b76 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -284,7 +284,7 @@ impl Lambda { } } } else { - debug!("Could not find data to generate network enhanced metrics"); + debug!("Could not find network offset data to generate network enhanced metrics"); } } @@ -513,51 +513,67 @@ impl Lambda { } let metric = metric::Metric::new( constants::DURATION_METRIC.into(), - MetricValue::distribution(metrics.duration_ms * constants::MS_TO_SEC), + MetricValue::distribution(metrics.duration_ms() * constants::MS_TO_SEC), self.get_dynamic_value_tags(), Some(timestamp), ); if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert duration metric: {}", e); } - let metric = metric::Metric::new( - constants::BILLED_DURATION_METRIC.into(), - MetricValue::distribution(metrics.billed_duration_ms as f64 * constants::MS_TO_SEC), - self.get_dynamic_value_tags(), - Some(timestamp), - ); - if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { - error!("failed to insert billed duration metric: {}", e); - } - let metric = metric::Metric::new( - constants::MAX_MEMORY_USED_METRIC.into(), - MetricValue::distribution(metrics.max_memory_used_mb as f64), - self.get_dynamic_value_tags(), - Some(timestamp), - ); - if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { - error!("failed to insert max memory used metric: {}", e); - } - let metric = metric::Metric::new( - constants::MEMORY_SIZE_METRIC.into(), - MetricValue::distribution(metrics.memory_size_mb as f64), - self.get_dynamic_value_tags(), - Some(timestamp), - ); - if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { - error!("failed to insert memory size metric: {}", e); - } - let cost_usd = - Self::calculate_estimated_cost_usd(metrics.billed_duration_ms, metrics.memory_size_mb); - let metric = metric::Metric::new( - constants::ESTIMATED_COST_METRIC.into(), - MetricValue::distribution(cost_usd), - self.get_dynamic_value_tags(), - Some(timestamp), - ); - if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { - error!("failed to insert estimated cost metric: {}", e); + match metrics { + ReportMetrics::ManagedInstance(_) => { + // In Managed Instance mode, we can't track these metrics for a given lambda invocation + // - billed duration + // - max memory used + // - memory size + // - estimated cost + } + ReportMetrics::OnDemand(metrics) => { + let metric = metric::Metric::new( + constants::BILLED_DURATION_METRIC.into(), + MetricValue::distribution( + metrics.billed_duration_ms as f64 * constants::MS_TO_SEC, + ), + self.get_dynamic_value_tags(), + Some(timestamp), + ); + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { + error!("failed to insert billed duration metric: {}", e); + } + let metric = metric::Metric::new( + constants::MAX_MEMORY_USED_METRIC.into(), + MetricValue::distribution(metrics.max_memory_used_mb as f64), + self.get_dynamic_value_tags(), + Some(timestamp), + ); + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { + error!("failed to insert max memory used metric: {}", e); + } + let metric = metric::Metric::new( + constants::MEMORY_SIZE_METRIC.into(), + MetricValue::distribution(metrics.memory_size_mb as f64), + self.get_dynamic_value_tags(), + Some(timestamp), + ); + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { + error!("failed to insert memory size metric: {}", e); + } + + let cost_usd = Self::calculate_estimated_cost_usd( + metrics.billed_duration_ms, + metrics.memory_size_mb, + ); + let metric = metric::Metric::new( + constants::ESTIMATED_COST_METRIC.into(), + MetricValue::distribution(cost_usd), + self.get_dynamic_value_tags(), + Some(timestamp), + ); + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { + error!("failed to insert estimated cost metric: {}", e); + } + } } } @@ -600,7 +616,8 @@ impl Lambda { }); } - pub fn resume_usage_metrics_monitoring(&self) { + // Reset metrics and resume monitoring for the next invocation + pub fn restart_usage_metrics_monitoring(&self) { if !self.config.enhanced_metrics { return; } @@ -613,6 +630,25 @@ impl Lambda { self.enhanced_metrics_handle.resume_monitoring(); } + /// Resume monitoring without resetting metrics. Used in managed instance mode to resume monitoring between invocations. + pub fn resume_usage_metrics_monitoring(&self) { + if !self.config.enhanced_metrics { + return; + } + + debug!("Starting sandbox-level usage metrics monitoring (managed instance mode)"); + self.enhanced_metrics_handle.resume_monitoring(); + } + + /// Pause monitoring without emitting metrics. Used in managed instance mode to pause between invocations. + pub fn pause_usage_metrics_monitoring(&self) { + if !self.config.enhanced_metrics { + return; + } + + self.enhanced_metrics_handle.pause_monitoring(); + } + pub fn set_usage_enhanced_metrics(&self) { if !self.config.enhanced_metrics { return; @@ -765,6 +801,7 @@ mod tests { use super::*; use crate::config; + use crate::extension::telemetry::events::{OnDemandReportMetrics, ReportMetrics}; use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::metric::EMPTY_TAGS; const PRECISION: f64 = 0.000_000_01; @@ -871,14 +908,14 @@ mod tests { ); lambda.set_post_runtime_duration_metric(100.0, now); lambda.set_report_log_metrics( - &ReportMetrics { + &ReportMetrics::OnDemand(OnDemandReportMetrics { duration_ms: 100.0, billed_duration_ms: 100, max_memory_used_mb: 128, memory_size_mb: 256, init_duration_ms: Some(50.0), restore_duration_ms: None, - }, + }), now, ); assert!( @@ -1127,14 +1164,14 @@ mod tests { async fn test_set_report_log_metrics() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); - let report_metrics = ReportMetrics { + let report_metrics = ReportMetrics::OnDemand(OnDemandReportMetrics { duration_ms: 100.0, billed_duration_ms: 100, max_memory_used_mb: 128, memory_size_mb: 256, init_duration_ms: Some(50.0), restore_duration_ms: None, - }; + }); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") diff --git a/bottlecap/src/proc/constants.rs b/bottlecap/src/proc/constants.rs index f0d48dc69..169abd5ed 100644 --- a/bottlecap/src/proc/constants.rs +++ b/bottlecap/src/proc/constants.rs @@ -7,5 +7,7 @@ pub const VAR_LANG_BIN_PATH: &str = "/var/lang/bin"; pub const LAMBDA_NETWORK_INTERFACE: &str = "vinternal_1"; pub const LAMBDA_RUNTIME_NETWORK_INTERFACE: &str = "vint_runtime"; +pub const MANAGED_INSTANCE_NETWORK_INTERFACE: &str = "Ceth1"; + pub const LAMBDA_FILE_DESCRIPTORS_DEFAULT_LIMIT: f64 = 1024.0; pub const LAMBDA_EXECUTION_PROCESSES_DEFAULT_LIMIT: f64 = 1024.0; diff --git a/bottlecap/src/proc/mod.rs b/bottlecap/src/proc/mod.rs index 1ba308b5d..82c0faf47 100644 --- a/bottlecap/src/proc/mod.rs +++ b/bottlecap/src/proc/mod.rs @@ -8,8 +8,8 @@ use std::{ }; use constants::{ - LAMBDA_NETWORK_INTERFACE, LAMBDA_RUNTIME_NETWORK_INTERFACE, PROC_NET_DEV_PATH, PROC_PATH, - PROC_STAT_PATH, PROC_UPTIME_PATH, + LAMBDA_NETWORK_INTERFACE, LAMBDA_RUNTIME_NETWORK_INTERFACE, MANAGED_INSTANCE_NETWORK_INTERFACE, + PROC_NET_DEV_PATH, PROC_PATH, PROC_STAT_PATH, PROC_UPTIME_PATH, }; use regex::Regex; use tracing::{debug, trace}; @@ -63,6 +63,7 @@ fn get_network_data_from_path(path: &str) -> Result { if values.next().is_some_and(|interface_name| { interface_name.starts_with(LAMBDA_NETWORK_INTERFACE) || interface_name.starts_with(LAMBDA_RUNTIME_NETWORK_INTERFACE) + || interface_name.starts_with(MANAGED_INSTANCE_NETWORK_INTERFACE) // managed instance mode }) { // Read the value for received bytes if present let rx_bytes: Option = values.next().and_then(|s| s.parse().ok()); @@ -356,6 +357,7 @@ mod tests { #[test] fn test_get_network_data() { + // On-demand let path = "./tests/proc/net/valid_dev"; let network_data_result = get_network_data_from_path(path_from_root(path).as_str()); assert!(network_data_result.is_ok()); @@ -378,6 +380,18 @@ mod tests { let path = "./tests/proc/net/nonexistent_dev"; let network_data_result = get_network_data_from_path(path_from_root(path).as_str()); assert!(network_data_result.is_err()); + + // Managed Instance + let path = "./tests/proc/net/managed_instance_valid_dev"; + let network_data_result = get_network_data_from_path(path_from_root(path).as_str()); + assert!(network_data_result.is_ok()); + let network_data = network_data_result.unwrap(); + assert!((network_data.rx_bytes - 309_048.0).abs() < f64::EPSILON); + assert!((network_data.tx_bytes - 84870.0).abs() < f64::EPSILON); + + let path = "./tests/proc/net/managed_instance_invalid_dev"; + let network_data_result = get_network_data_from_path(path_from_root(path).as_str()); + assert!(network_data_result.is_err()); } #[test] diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 511740249..20fe0d6b6 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -497,6 +497,7 @@ mod tests { runtime_api: aws_lambda_runtime_api.to_string(), aws_lwa_proxy_lambda_runtime_api: Some(aws_lwa_lambda_runtime_api.to_string()), exec_wrapper: None, + initialization_type: "on-demand".into(), }); let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config))); let (invocation_processor_handle, invocation_processor_service) = diff --git a/bottlecap/src/proxy/mod.rs b/bottlecap/src/proxy/mod.rs index 912a200d1..35c8139d0 100644 --- a/bottlecap/src/proxy/mod.rs +++ b/bottlecap/src/proxy/mod.rs @@ -47,6 +47,7 @@ mod tests { runtime_api: String::new(), sandbox_init_time: Instant::now(), exec_wrapper: Some("/opt/datadog_wrapper".to_string()), + initialization_type: "on-demand".into(), }); assert!(should_start_proxy(&config, aws_config)); } @@ -61,6 +62,7 @@ mod tests { runtime_api: String::new(), sandbox_init_time: Instant::now(), exec_wrapper: None, + initialization_type: "on-demand".into(), }); assert!(should_start_proxy(&config, aws_config)); } @@ -79,6 +81,7 @@ mod tests { runtime_api: String::new(), sandbox_init_time: Instant::now(), exec_wrapper: Some("/opt/datadog_wrapper".to_string()), + initialization_type: "on-demand".into(), }); assert!(should_start_proxy(&config, aws_config)); } @@ -97,6 +100,7 @@ mod tests { runtime_api: String::new(), sandbox_init_time: Instant::now(), exec_wrapper: Some("/opt/datadog_wrapper".to_string()), + initialization_type: "on-demand".into(), }); assert!(!should_start_proxy(&config, aws_config)); } @@ -116,6 +120,7 @@ mod tests { sandbox_init_time: Instant::now(), // Datadog wrapper is not set, so we should not start the proxy exec_wrapper: Some("/opt/not_datadog".to_string()), + initialization_type: "on-demand".into(), }); assert!(!should_start_proxy(&config, aws_config)); } diff --git a/bottlecap/src/secrets/decrypt.rs b/bottlecap/src/secrets/decrypt.rs index d6cab57f0..63914b922 100644 --- a/bottlecap/src/secrets/decrypt.rs +++ b/bottlecap/src/secrets/decrypt.rs @@ -423,6 +423,7 @@ mod tests { sandbox_init_time: Instant::now(), runtime_api: String::new(), exec_wrapper: None, + initialization_type: "on-demand".into(), }), &AwsCredentials{ aws_access_key_id: "AKIDEXAMPLE".to_string(), @@ -482,6 +483,7 @@ mod tests { sandbox_init_time: Instant::now(), runtime_api: String::new(), exec_wrapper: None, + initialization_type: "on-demand".into(), }), &AwsCredentials{ aws_access_key_id: "AKIDEXAMPLE".to_string(), @@ -541,6 +543,7 @@ mod tests { sandbox_init_time: Instant::now(), runtime_api: String::new(), exec_wrapper: None, + initialization_type: "on-demand".into(), }), &AwsCredentials{ aws_access_key_id: "AKIDEXAMPLE".to_string(), @@ -600,6 +603,7 @@ mod tests { sandbox_init_time: Instant::now(), runtime_api: String::new(), exec_wrapper: None, + initialization_type: "on-demand".into(), }), &AwsCredentials{ aws_access_key_id: "AKIDEXAMPLE".to_string(), diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 59aead3f1..747237eef 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -67,6 +67,7 @@ async fn test_logs() { Arc::clone(&arc_conf), bus_tx.clone(), logs_aggr_handle.clone(), + false, ); let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key)); let logs_flusher = LogsFlusher::new(api_key_factory, logs_aggr_handle, arc_conf.clone()); diff --git a/bottlecap/tests/proc/net/managed_instance_invalid_dev b/bottlecap/tests/proc/net/managed_instance_invalid_dev new file mode 100644 index 000000000..0c9efd8cb --- /dev/null +++ b/bottlecap/tests/proc/net/managed_instance_invalid_dev @@ -0,0 +1,4 @@ +Inter-| Receive | Transmit +face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed +lo: 9875 152 0 0 0 0 0 0 9875 152 0 0 0 0 0 0 +Veth0: 520 4 0 0 0 0 0 0 520 4 0 0 0 0 0 0 diff --git a/bottlecap/tests/proc/net/managed_instance_valid_dev b/bottlecap/tests/proc/net/managed_instance_valid_dev new file mode 100644 index 000000000..290fbeae6 --- /dev/null +++ b/bottlecap/tests/proc/net/managed_instance_valid_dev @@ -0,0 +1,5 @@ +Inter-| Receive | Transmit +face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed +lo: 9875 152 0 0 0 0 0 0 9875 152 0 0 0 0 0 0 +Veth0: 520 4 0 0 0 0 0 0 520 4 0 0 0 0 0 0 +Ceth1: 309048 577 0 0 0 0 0 0 84870 696 0 0 0 0 0 0 diff --git a/local_tests/managed-instance/Dockerfile b/local_tests/managed-instance/Dockerfile new file mode 100644 index 000000000..19cc817b9 --- /dev/null +++ b/local_tests/managed-instance/Dockerfile @@ -0,0 +1,48 @@ +FROM public.ecr.aws/lambda/python:3.9 + +# Build argument for parent directory name +ARG TARGET=managed-instance + +## Set to 900 (15 minutes) for debugging - Lambda max timeout +#ENV AWS_LAMBDA_FUNCTION_TIMEOUT=900 +ENV LOG_LEVEL=debug +ENV DD_LOG_LEVEL=debug +# Enforce flushing +# ENV DD_SERVERLESS_FLUSH_STRATEGY=continuously,60000 +# ENV DD_SERVERLESS_FLUSH_STRATEGY=periodically,60000 + + +# ---- Add remote dev tooling ---- +# SSH server, LLDB, GDB, procps (ps/top), curl, git +RUN yum -y install lldb gdb gdb-gdbserver procps-ng tar gzip zip curl git shadow-utils sysvinit-tools\ + && \ + yum -y install \ + fontconfig \ + freetype \ + libXrender \ + libXext \ + libXi \ + libXtst \ + libX11 \ + alsa-lib \ + which \ + && yum clean all + +# Optional: install Rust toolchain (useful if you want to build inside the container) +RUN curl https://sh.rustup.rs -sSf | bash -s -- -y +ENV PATH=/root/.cargo/bin:$PATH + + +# Download the AWS Lambda Runtime Interface Emulator from GitHub releases +RUN curl -Lo /usr/local/bin/aws-lambda-rie https://github.com/aws/aws-lambda-runtime-interface-emulator/releases/latest/download/aws-lambda-rie-arm64 \ + && chmod +x /usr/local/bin/aws-lambda-rie + +# The first path is the location of your local agent binary +COPY .binaries/bottlecap-arm64 /opt/extensions/datadog-agent + +# Copy the lambda function to the container +COPY local_tests/${TARGET}/index.py ${LAMBDA_TASK_ROOT}/ + +CMD ["index.handler"] + +ENTRYPOINT ["/entrypoint.sh"] \ No newline at end of file diff --git a/local_tests/managed-instance/Makefile b/local_tests/managed-instance/Makefile new file mode 100644 index 000000000..e2e575928 --- /dev/null +++ b/local_tests/managed-instance/Makefile @@ -0,0 +1,77 @@ +CONTAINER_NAME=managed-instance-lambda +IMAGE?=$(CONTAINER_NAME):latest +PORT?=2345 + +# Get username once and normalize it +USERNAME_NORMALIZED=$(whoami | tr -d '.') + +# Get current directory name for portability +CURRENT_DIR=$(shell basename $$(pwd)) + +help: + @echo "Available targets:" + @echo " make build - Build the bottlecap layer in debug flavor to allow attaching the process" + @echo " make build-release - Build the bottlecap layer in release flavor" + @echo " make start - Start the extension" + @echo " make stop - Stop the running Lambda container" + @echo " make status - Check the status of the Lambda container" + @echo " make invoke - Invoke the Lambda function" + @echo " make attach - Attach gdbserver to the running extension" + @echo " make logs - Follow the container logs" + @echo " make shell - Open a shell session in the running container" + + +build: + cd ../.. && \ + ARCHITECTURE=arm64 FIPS=0 ALPINE=0 DEBUG=1 scripts/build_bottlecap_layer.sh && \ + docker build -f local_tests/$(CURRENT_DIR)/Dockerfile --build-arg CURRENT_PARENT_DIR=$(CURRENT_DIR) -t $(IMAGE) . && \ + cd - + +build-release: + cd ../.. && \ + ARCHITECTURE=arm64 FIPS=0 ALPINE=0 DEBUG=0 scripts/build_bottlecap_layer.sh && \ + docker build -f local_tests/$(CURRENT_DIR)/Dockerfile --build-arg CURRENT_PARENT_DIR=$(CURRENT_DIR) -t $(IMAGE) . && \ + cd - + +start: + docker run -d --name $(CONTAINER_NAME) \ + -p 9000:8080 \ + -p 2345:2345 \ + --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ + --cpus=1 \ + -e AWS_LAMBDA_INITIALIZATION_TYPE="lambda-managed-instances" \ + -e AWS_LAMBDA_MAX_CONCURRENCY=5 \ + -e DD_API_KEY="$(DD_API_KEY)" \ + -e DD_SITE="datadoghq.com" \ + -e DD_SERVICE="dd-$(USERNAME_NORMALIZED)-test" \ + -e DD_LOG_LEVEL="debug" \ + -e DD_ENV="dev" \ + --volume "$$(pwd)/entrypoint.sh:/entrypoint.sh" \ + $(IMAGE) + @echo "Container '$(CONTAINER_NAME)' started. Use 'make logs' to view logs." + +logs: + docker logs -f $(CONTAINER_NAME) + +stop: + @docker stop $(CONTAINER_NAME) 2>/dev/null || true + @docker rm $(CONTAINER_NAME) 2>/dev/null || true + @echo "Container '$(CONTAINER_NAME)' stopped and removed." + +status: + @if docker ps --filter name=$(CONTAINER_NAME) --format '{{.Names}}' | grep -q $(CONTAINER_NAME); then \ + echo "up"; \ + else \ + echo "down"; \ + fi + +# Helpful shell in a running container +shell: + docker exec -it $(CONTAINER_NAME) bash + +attach: + @docker exec -it $(CONTAINER_NAME) sh -lc 'ps aux | grep datadog-agent | grep -v grep | awk "{print \$$2}" | xargs -I{} gdbserver --attach 0.0.0.0:$(PORT) {}' + +# Trigger an invoke from the host against the RIE proxy on :9000 +invoke: + curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d '{}' --max-time 30 \ No newline at end of file diff --git a/local_tests/managed-instance/README.md b/local_tests/managed-instance/README.md new file mode 100644 index 000000000..00788d921 --- /dev/null +++ b/local_tests/managed-instance/README.md @@ -0,0 +1,544 @@ +# Datadog Lambda Extension - Managed Instance Mode Testing + +## Overview + +This setup enables local testing of the Datadog Lambda Extension running in **Managed Instance Mode** using the AWS Lambda Runtime Interface Emulator (RIE). Managed Instance mode is a specialized initialization mode triggered by setting `AWS_LAMBDA_INITIALIZATION_TYPE="lambda-managed-instances"`, which changes how the extension behaves during Lambda initialization. + +## What is Managed Instance Mode? + +Managed Instance mode is activated when the extension detects it's running in a specific AWS Lambda environment (EC2 capacity provider). In this mode: +- The extension initializes with different telemetry endpoints +- Uses managed-instance-specific API routes for telemetry reporting +- Generates managed-instance-specific statistics and metrics +- Operates with enhanced monitoring capabilities + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ HOST MACHINE (macOS ARM64) │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌───────────────────────────────────────────────────────────────────────┐ │ +│ │ RustRover IDE (Optional) │ │ +│ │ │ │ +│ │ • Source: bottlecap/src/bin/bottlecap/main.rs │ │ +│ │ • Symbol file: .binaries/bottlecap-arm64 (with debug_info) │ │ +│ │ • Path mapping: /tmp/dd/bottlecap → local bottlecap/ │ │ +│ │ • Remote debugging available on port 2345 │ │ +│ └────────────────┬──────────────────────────────────────────────────────┘ │ +│ │ │ +│ │ GDB Remote Protocol (localhost:2345) │ +│ ▼ │ +│ ┌────────────────────────────────┐ ┌───────────────────────────────┐ │ +│ │ Makefile Targets │ │ Port Mappings │ │ +│ │ │ │ │ │ +│ │ make build → Debug build │ │ 9000 → Lambda invocations │ │ +│ │ make build-release → Release │ │ 2345 → GDB debugging │ │ +│ │ make start → Start container│ │ │ │ +│ │ make invoke → Trigger Lambda │ │ │ │ +│ │ make logs → Tail logs │ │ │ │ +│ │ make status → Check status │ │ │ │ +│ │ make attach → Attach debugger│ │ │ │ +│ │ make shell → Container shell│ │ │ │ +│ │ make stop → Stop container │ │ │ │ +│ └────────────────────────────────┘ └───────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + Port Mappings: │ 2345:2345, 9000:8080 + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ DOCKER CONTAINER: managed-instance-lambda (Linux ARM64) │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ Environment Variables: │ +│ • AWS_LAMBDA_INITIALIZATION_TYPE="lambda-managed-instances" (Managed) │ +│ • DD_LOG_LEVEL="debug" │ +│ • DD_FLUSHING_STRATEGY="continuously,1" │ +│ │ +│ ┌────────────────────────────────────────────────────────────────────────┐ │ +│ │ Lambda Runtime (Custom RIE for Managed Instance) │ │ +│ │ • Listens on :8080 (exposed as host :9000) │ │ +│ │ • Custom RIE binary: local_tests/rie/managed-instance/arm64/rie │ │ +│ └──────────┬──────────────────────────────────────────┬──────────────────┘ │ +│ │ │ │ +│ │ Extensions API │ Handler API │ +│ ▼ ▼ │ +│ ┌─────────────────────────────┐ ┌────────────────────────────────┐ │ +│ │ Datadog Extension │ │ Lambda Function │ │ +│ │ (Managed Instance Mode) │ │ (Python Handler) │ │ +│ │ │ │ │ │ +│ │ • MI-specific init │ │ • Simple Python handler │ │ +│ │ • Enhanced telemetry │ │ • Returns 200 status │ │ +│ │ • Continuous flushing │ │ │ │ +│ └──────────┬──────────────────┘ └────────────────────────────────┘ │ +│ │ │ +│ │ Debugging (optional) │ +│ ▼ │ +│ ┌─────────────────────────────┐ │ +│ │ gdbserver (optional) │ │ +│ │ 0.0.0.0:2345 │ │ +│ │ │ │ +│ │ Attached to datadog-agent │ │ +│ └─────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +## Prerequisites + +1. **Docker** installed and running +2. **Datadog API Key** for sending telemetry +3. **RustRover IDE** (optional, only needed for debugging) +4. **Rust toolchain** (for building the extension) + +## Makefile Targets + +| Target | Description | +|--------|-------------| +| `make build` | Build bottlecap in debug mode with debug symbols | +| `make build-release` | Build bottlecap in optimized release mode | +| `make start` | Start the container with managed instance mode enabled | +| `make invoke` | Trigger a Lambda invocation via HTTP | +| `make logs` | Tail container logs (Ctrl+C to exit) | +| `make status` | Check if container is running | +| `make attach` | Attach gdbserver to the running extension (for debugging) | +| `make shell` | Open a bash shell inside the container | +| `make stop` | Stop and remove the container | +| `make help` | Show all available targets | + +## Quick Start Guide + +> **⚠️ IMPORTANT: Known Limitation with Supplied RIE Runtime** +> +> The custom RIE (Runtime Interface Emulator) binaries provided in `local_tests/rie/managed-instance/` have **not been updated** to support the latest managed instance mode schema version changes. +> +> **Before running `make build`, you MUST temporarily update the local codebase:** +> +> In `bottlecap/src/extension/telemetry/mod.rs` (lines 48-52), change: +> ```rust +> let schema_version = if managed_instance_mode { +> MANAGED_INSTANCE_SCHEMA_VERSION // This is the correct version +> } else { +> ON_DEMAND_SCHEMA_VERSION +> }; +> ``` +> +> **To:** +> ```rust +> let schema_version = if managed_instance_mode { +> ON_DEMAND_SCHEMA_VERSION // Temporary workaround for outdated RIE +> } else { +> ON_DEMAND_SCHEMA_VERSION +> }; +> ``` +> +> This workaround forces the extension to use the `ON_DEMAND_SCHEMA_VERSION` even in managed instance mode, which is compatible with the current RIE binaries. **Remember to revert this change before committing any code or deploying to production.** +> +> This limitation only affects local testing with RIE. In actual AWS Lambda Managed Instance environments, the correct `MANAGED_INSTANCE_SCHEMA_VERSION` should be used. + +### 1. Build the Container + +Build the extension and container image: + +```bash +cd local_tests/managed-instance +make build +``` + +This compiles bottlecap with debug symbols (for easier troubleshooting) and packages it into a Docker image with: +- Custom RIE binary for managed instance mode +- GDB and gdbserver for optional debugging +- Development tooling (Rust, curl, git, etc.) + +For production-like testing, use: +```bash +make build-release +``` + +### 2. Set Your Datadog API Key + +Export your Datadog API key as an environment variable: + +```bash +export DD_API_KEY="your_datadog_api_key_here" +``` + +**Important:** The API key is required for the extension to send telemetry to Datadog. Without it, the extension will start but won't be able to flush data. + +### 3. Start the Container + +```bash +make start +``` + +The container starts with: +- **Port 9000**: Lambda invocations endpoint +- **Port 2345**: GDB remote debugging (optional) +- **Managed Instance mode**: Enabled via `AWS_LAMBDA_INITIALIZATION_TYPE="lambda-managed-instances"` +- **CPU limit**: 1 CPU to simulate Lambda resource constraints +- **Continuous flushing**: Enabled for testing real-time telemetry + +Check status: +```bash +make status +``` + +View logs: +```bash +make logs +``` + +Look for the initialization message: +``` +DD_EXTENSION | DEBUG | Starting Datadog Extension v88 +DD_EXTENSION | DEBUG | Managed Instance mode detected +DD_EXTENSION | DEBUG | Datadog Next-Gen Extension ready in XXms +``` + +### 4. Invoke the Lambda Function + +Trigger a Lambda invocation: + +```bash +make invoke +``` + +This sends a POST request to the RIE, which: +1. Initializes the extension (first invocation only) +2. Invokes the Python handler function +3. Triggers the extension to process and flush telemetry +4. Returns the function response + +Expected response: +```json +{"statusCode": 200, "body": "Hello from Lambda!"} +``` + +### 5. Monitor Telemetry + +Watch the logs to see managed instance mode in action: + +```bash +make logs +``` + +Look for: +- Extension initialization logs +- Managed-instance-specific telemetry endpoints +- Metric and trace flushing +- Continuous flush strategy execution + +### 6. Stop the Container + +When done testing: + +```bash +make stop +``` + +## Testing Workflow + +### Basic Testing + +```bash +# Terminal 1: Build and start +make build # Build with debug symbols +make start # Start in background + +# Terminal 2: Monitor logs +make logs # Watch extension behavior + +# Terminal 1: Test invocations +make invoke # First invocation (initializes extension) +make invoke # Subsequent invocations +make invoke # Test continuous flushing + +# Check Datadog UI for: +# - Metrics from service: dd--test +# - Environment: dev +# - Managed-instance-specific statistics + +# When done +make stop +``` + +### Debugging Workflow + +For debugging the extension with RustRover (see [bottlecap debugging guide](../bottlecap/README.md)): + +**Correct Debugging Sequence:** + +```bash +# Step 1: Build with debug symbols +make build # Must use debug build (not release) + +# Step 2: Start the container +make start # Container runs in background + +# Step 3: Set breakpoints in code +# Open bottlecap/src/bin/bottlecap/main.rs in RustRover +# Click in the gutter next to line numbers to set breakpoints + +# Step 4: Attach gdbserver (in a separate terminal) +make attach # Attach gdbserver to the running extension process + # Should show: "Attached; pid = XX" + # "Listening on port 2345" + # Keep this terminal running! + +# Step 5: Start debugger in RustRover IDE +# Run → Debug 'Remote Debug Lambda Extension' +# Wait for "Connected" message in Debug panel + +# Step 6: Invoke Lambda to hit breakpoints +make invoke # Trigger Lambda invocation - breakpoints should hit! +``` + +**Important Notes:** +- Build with `make build` (NOT `make build-release`) to include debug symbols +- The extension process must be running before `make attach` will work +- If `make attach` fails with "process not found", the extension hasn't started yet - trigger an initial invocation first +- Keep the `make attach` terminal running throughout your debugging session +- Set breakpoints BEFORE starting the debugger in step 5 + +## Environment Variables + +The container is configured with these environment variables: + +| Variable | Value | Purpose | +|----------|-------|---------| +| `AWS_LAMBDA_INITIALIZATION_TYPE` | `lambda-managed-instances` | Activates managed instance mode | +| `DD_API_KEY` | From host env | Authentication for Datadog API | +| `DD_SITE` | `datadoghq.com` | Datadog site to send data to | +| `DD_SERVICE` | `dd--test` | Service name for telemetry | +| `DD_ENV` | `dev` | Environment tag for telemetry | +| `DD_LOG_LEVEL` | `debug` | Extension logging level | +| `DD_FLUSHING_STRATEGY` | `continuously,1` | Flush every 1 second | + +You can modify these in the Makefile or override when starting the container. + +## Container Configuration + +### Resource Limits +- **CPU**: Limited to 1 CPU (`--cpus=1`) to simulate Lambda constraints +- **Memory**: Uses Docker default (can be limited with `--memory` flag) + +### Security Settings +- **SYS_PTRACE**: Enabled for gdbserver attachment (debugging) +- **seccomp**: Unconfined for debugging tools + +### Volume Mounts +- **entrypoint.sh**: Mounted from local directory for easy modification + +## Custom RIE Binary + +The managed instance setup uses a custom RIE binary located at `local_tests/rie/managed-instance/arm64/rie`. This custom RIE may include: +- Modified behavior for managed instance mode testing +- Additional logging or instrumentation +- Specialized telemetry endpoint handling + +The custom RIE is copied into the container at build time and used instead of the standard AWS RIE. + +## Troubleshooting + +### Issue: Container won't start + +**Solution:** +```bash +# Check for conflicting containers +make status + +# Force cleanup and restart +make stop +make start +``` + +### Issue: Extension not in managed instance mode + +**Error:** Extension starts but doesn't show managed-instance-specific behavior + +**Solution:** +1. Verify environment variable is set: +```bash +docker exec managed-instance-lambda env | grep AWS_LAMBDA_INITIALIZATION_TYPE +# Should output: AWS_LAMBDA_INITIALIZATION_TYPE=lambda-managed-instances +``` + +2. Check logs for managed instance mode detection: +```bash +make logs | grep -i "managed instance" +``` + +### Issue: No telemetry in Datadog + +**Cause:** Missing or invalid DD_API_KEY + +**Solution:** +```bash +# Verify API key is set +echo $DD_API_KEY + +# Set it if missing +export DD_API_KEY="your_api_key" + +# Restart container +make stop +make start +``` + +### Issue: Continuous flushing not working + +**Solution:** +Check the flushing strategy configuration: +```bash +docker exec managed-instance-lambda env | grep DD_FLUSHING_STRATEGY +``` + +Increase logging and watch for flush events: +```bash +make logs | grep -i flush +``` + +### Issue: Cannot attach debugger + +**Error:** `make attach` fails to find process + +**Cause:** Extension hasn't started yet + +**Solution:** +```bash +# First, invoke Lambda to start extension +make invoke + +# Wait for startup message in logs +make logs | grep "Extension ready" + +# Then attach +make attach +``` + +### Issue: Permission denied errors + +**Solution:** +Ensure Docker has proper permissions: +```bash +# On macOS, check Docker Desktop settings +# On Linux, ensure user is in docker group +sudo usermod -aG docker $USER +``` + +## Advanced Usage + +### Custom Configuration + +Edit the Makefile to customize environment variables: + +```makefile +start: + docker run -d --name $(CONTAINER_NAME) \ + -p 9000:8080 \ + -p 2345:2345 \ + -e AWS_LAMBDA_INITIALIZATION_TYPE="lambda-managed-instances" \ + -e DD_API_KEY="$(DD_API_KEY)" \ + -e DD_SITE="datadoghq.com" \ + -e DD_SERVICE="my-custom-service" \ + -e DD_LOG_LEVEL="trace" \ # More verbose logging + -e DD_ENV="staging" \ + -e DD_FLUSHING_STRATEGY="end_of_invocation" \ # Different strategy + $(IMAGE) +``` + +### Interactive Container Shell + +Get a shell inside the running container: + +```bash +make shell +``` + +Then you can: +```bash +# Check extension process +ps aux | grep datadog-agent + +# View extension logs +cat /tmp/datadog-agent.log + +# Test network connectivity +curl https://api.datadoghq.com/api/v1/validate + +# Inspect RIE +ps aux | grep aws-lambda-rie +``` + +### Manual Invocation with Custom Payload + +```bash +curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" \ + -d '{"test": "custom", "requestId": "12345"}' \ + --max-time 30 +``` + +### Rebuild After Code Changes + +```bash +# Stop running container +make stop + +# Rebuild with latest code +make build + +# Start fresh +make start +make invoke +``` + +## Differences from Bottlecap Setup + +| Aspect | Bottlecap Setup | Managed Instance Setup | +|--------|----------------|------------------------| +| **Initialization Mode** | Standard Lambda | Managed Instance mode (`lambda-managed-instances`) | +| **RIE Binary** | Standard RIE | Custom managed instance RIE | +| **Telemetry Endpoints** | Standard API routes | Managed-instance-specific routes | +| **Primary Use Case** | General debugging | Managed instance mode testing | +| **Flushing Strategy** | Various strategies | Continuous flushing by default | +| **Container Name** | `bottlecap-lambda` | `managed-instance-lambda` | + +## Key Files + +| File | Purpose | +|------|---------| +| `Dockerfile` | Container definition with custom RIE and tools | +| `Makefile` | Build and test automation | +| `index.py` | Simple Python Lambda handler | +| `entrypoint.sh` | Container entrypoint with RIE initialization | +| `local_tests/rie/managed-instance/arm64/rie` | Custom RIE binary for managed instance mode | + +## Managed Instance Mode Implementation + +The managed instance mode is implemented in the bottlecap extension: + +- **Detection**: bottlecap/src/extension/telemetry/mod.rs - Checks `AWS_LAMBDA_INITIALIZATION_TYPE` +- **Telemetry**: Uses managed-instance-specific API routes for stats submission +- **Stats Generation**: Generates managed-instance-specific metrics and telemetry + +See the main [CLAUDE.md](../../CLAUDE.md) for more details on the managed instance mode implementation. + +## Additional Resources + +- **[Bottlecap Debugging Guide](../bottlecap/README.md)** - Detailed debugging setup with RustRover +- **[AWS Lambda Extensions API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html)** +- **[Datadog Lambda Extension Documentation](https://docs.datadoghq.com/serverless/libraries_integrations/extension/)** +- **[AWS Lambda Runtime Interface Emulator](https://github.com/aws/aws-lambda-runtime-interface-emulator)** + +## Testing Checklist + +When testing managed instance mode, verify: + +- ✅ Extension starts with managed instance mode detected +- ✅ Telemetry uses managed-instance-specific endpoints +- ✅ Continuous flushing strategy is active +- ✅ Metrics appear in Datadog with correct service/env tags +- ✅ Extension handles multiple invocations correctly +- ✅ Resource constraints (CPU) don't cause issues +- ✅ Debug logging shows managed-instance-specific behavior \ No newline at end of file diff --git a/local_tests/managed-instance/entrypoint.sh b/local_tests/managed-instance/entrypoint.sh new file mode 100755 index 000000000..e43d18da1 --- /dev/null +++ b/local_tests/managed-instance/entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/sh +if [ -z "${AWS_LAMBDA_RUNTIME_API}" ]; then + echo "Running in RIE mode" + exec /usr/local/bin/aws-lambda-rie /var/runtime/bootstrap index.handler +else + echo "Running in non-RIE mode" + exec /var/runtime/bootstrap index.handler +fi diff --git a/local_tests/managed-instance/index.py b/local_tests/managed-instance/index.py new file mode 100755 index 000000000..32b617c6f --- /dev/null +++ b/local_tests/managed-instance/index.py @@ -0,0 +1,4 @@ +def handler(event, context): + return { + 'statusCode': 200, 'body': 'Hello from Lambda!' + } diff --git a/scripts/sync-upstream.sh b/scripts/sync-upstream.sh new file mode 100755 index 000000000..48f52b88f --- /dev/null +++ b/scripts/sync-upstream.sh @@ -0,0 +1,215 @@ +#!/bin/bash + +# Sync Upstream Script +# This script syncs your private fork with the upstream public repository + +set -e # Exit on any error + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Configuration +UPSTREAM_REPO="DataDog/datadog-lambda-extension" +UPSTREAM_BRANCH="main" +TARGET_BRANCH="main" +UPSTREAM_URL="https://github.com/${UPSTREAM_REPO}.git" + +# Function to print colored output +print_status() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +print_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +print_warning() { + echo -e "${YELLOW}[WARNING]${NC} $1" +} + +print_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Function to show usage +show_usage() { + echo "Usage: $0 [OPTIONS]" + echo "" + echo "Options:" + echo " -h, --help Show this help message" + echo " -f, --force Force sync even if no new commits" + echo " -d, --dry-run Show what would be synced without making changes" + echo " -v, --verbose Show detailed output" + echo " --no-push Don't push changes to origin (just merge locally)" + echo "" + echo "Examples:" + echo " $0 # Normal sync" + echo " $0 --dry-run # See what would be synced" + echo " $0 --force # Force sync even if no new commits" + echo " $0 --no-push # Merge locally but don't push" +} + +# Parse command line arguments +DRY_RUN=false +FORCE_SYNC=false +VERBOSE=false +NO_PUSH=false + +while [[ $# -gt 0 ]]; do + case $1 in + -h|--help) + show_usage + exit 0 + ;; + -f|--force) + FORCE_SYNC=true + shift + ;; + -d|--dry-run) + DRY_RUN=true + shift + ;; + -v|--verbose) + VERBOSE=true + shift + ;; + --no-push) + NO_PUSH=true + shift + ;; + *) + print_error "Unknown option: $1" + show_usage + exit 1 + ;; + esac +done + +# Check if we're in a git repository +if ! git rev-parse --git-dir > /dev/null 2>&1; then + print_error "Not in a git repository!" + exit 1 +fi + +# Check if we're on the target branch +CURRENT_BRANCH=$(git branch --show-current) +if [ "$CURRENT_BRANCH" != "$TARGET_BRANCH" ]; then + print_warning "Not on $TARGET_BRANCH branch (currently on $CURRENT_BRANCH)" + read -p "Do you want to switch to $TARGET_BRANCH? (y/N): " -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + print_status "Switching to $TARGET_BRANCH branch..." + git checkout "$TARGET_BRANCH" + else + print_error "Aborting sync. Please switch to $TARGET_BRANCH branch first." + exit 1 + fi +fi + +# Check for uncommitted changes +if ! git diff-index --quiet HEAD --; then + print_error "You have uncommitted changes. Please commit or stash them first." + git status --short + exit 1 +fi + +print_status "Starting upstream sync..." +print_status "Upstream: $UPSTREAM_REPO" +print_status "Branch: $TARGET_BRANCH" + +# Add upstream remote if it doesn't exist +if ! git remote get-url upstream > /dev/null 2>&1; then + print_status "Adding upstream remote..." + git remote add upstream "$UPSTREAM_URL" +else + # Update upstream URL in case it changed + git remote set-url upstream "$UPSTREAM_URL" +fi + +# Fetch latest changes from upstream +print_status "Fetching latest changes from upstream..." +git fetch upstream "$UPSTREAM_BRANCH" + +# Get current and upstream commit hashes +CURRENT_COMMIT=$(git rev-parse HEAD) +UPSTREAM_COMMIT=$(git rev-parse "upstream/$UPSTREAM_BRANCH") + +print_status "Current commit: $CURRENT_COMMIT" +print_status "Upstream commit: $UPSTREAM_COMMIT" + +# Check if there are new commits +if [ "$UPSTREAM_COMMIT" = "$CURRENT_COMMIT" ] && [ "$FORCE_SYNC" = false ]; then + print_success "Already up to date with upstream!" + exit 0 +fi + +# Get commit details +if [ "$UPSTREAM_COMMIT" != "$CURRENT_COMMIT" ]; then + COMMIT_COUNT=$(git rev-list --count "$CURRENT_COMMIT".."upstream/$UPSTREAM_BRANCH") + print_status "Found $COMMIT_COUNT new commit(s) to sync" + + if [ "$VERBOSE" = true ] || [ "$DRY_RUN" = true ]; then + echo "" + print_status "New commits:" + git log --oneline "$CURRENT_COMMIT".."upstream/$UPSTREAM_BRANCH" + echo "" + fi +else + print_status "Force sync requested (no new commits)" +fi + +# Dry run mode +if [ "$DRY_RUN" = true ]; then + print_warning "DRY RUN MODE - No changes will be made" + print_status "Would merge $UPSTREAM_COMMIT into $CURRENT_COMMIT" + if [ "$NO_PUSH" = false ]; then + print_status "Would push changes to origin/$TARGET_BRANCH" + fi + exit 0 +fi + +# Perform the merge +print_status "Merging upstream changes..." +if git merge "upstream/$UPSTREAM_BRANCH" --no-edit; then + print_success "Successfully merged upstream changes" +else + print_error "Merge failed due to conflicts!" + print_error "Please resolve conflicts manually and then run:" + print_error " git add ." + print_error " git commit" + if [ "$NO_PUSH" = false ]; then + print_error " git push origin $TARGET_BRANCH" + fi + exit 1 +fi + +# Push changes to origin (unless --no-push is specified) +if [ "$NO_PUSH" = false ]; then + print_status "Pushing changes to origin..." + if git push origin "$TARGET_BRANCH"; then + print_success "Successfully pushed changes to origin" + else + print_error "Failed to push changes to origin" + exit 1 + fi +else + print_warning "Skipping push to origin (--no-push specified)" +fi + +# Final summary +echo "" +print_success "Upstream sync completed successfully!" +if [ "$UPSTREAM_COMMIT" != "$CURRENT_COMMIT" ]; then + print_status "Synced $COMMIT_COUNT commit(s) from upstream" +fi +print_status "Current commit: $(git rev-parse HEAD)" +print_status "Upstream commit: $UPSTREAM_COMMIT" + +# Show recent commits +echo "" +print_status "Recent commits:" +git log --oneline -5