Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use bottlecap::{
event_bus::bus::EventBus,
events::Event,
lifecycle::{
flush_control::FlushControl, invocation::processor::Processor as InvocationProcessor,
flush_control::FlushControl,
invocation::{context::ContextBuffer, processor::Processor as InvocationProcessor},
listener::Listener as LifecycleListener,
},
logger,
Expand Down Expand Up @@ -303,15 +304,21 @@ async fn extension_loop_active(
let mut metrics_flusher = MetricsFlusher::new(flusher_config);

// Lifecycle Invocation Processor
let context_buffer = Arc::new(Mutex::new(ContextBuffer::default()));
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Arc::clone(config),
aws_config,
Arc::clone(&metrics_aggr),
Arc::clone(&context_buffer),
)));

let (trace_agent_channel, trace_flusher, trace_processor, stats_flusher) =
start_trace_agent(config, resolved_api_key.clone(), &tags_provider);
let (trace_agent_channel, trace_flusher, trace_processor, stats_flusher) = start_trace_agent(
config,
resolved_api_key.clone(),
&tags_provider,
context_buffer.clone(),
);

let lifecycle_listener = LifecycleListener {
invocation_processor: Arc::clone(&invocation_processor),
Expand Down Expand Up @@ -539,6 +546,7 @@ fn start_trace_agent(
config: &Arc<Config>,
resolved_api_key: String,
tags_provider: &Arc<TagProvider>,
context_buffer: Arc<Mutex<ContextBuffer>>,
) -> (
Sender<datadog_trace_utils::send_data::SendData>,
Arc<trace_flusher::ServerlessTraceFlusher>,
Expand Down Expand Up @@ -578,6 +586,7 @@ fn start_trace_agent(
stats_aggregator,
stats_processor,
Arc::clone(tags_provider),
context_buffer,
));
let trace_agent_channel = trace_agent.get_sender_copy();

Expand Down
62 changes: 49 additions & 13 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::metrics::enhanced::lambda::EnhancedMetricData;
use std::collections::VecDeque;

use datadog_trace_protobuf::pb::Span;
use tracing::debug;

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -10,6 +11,7 @@ pub struct Context {
pub init_duration_ms: f64,
pub start_time: i64,
pub enhanced_metric_data: Option<EnhancedMetricData>,
pub tracer_span: Option<Span>,
}

impl Context {
Expand All @@ -20,13 +22,33 @@ impl Context {
init_duration_ms: f64,
start_time: i64,
enhanced_metric_data: Option<EnhancedMetricData>,
tracer_span: Option<Span>,
) -> Self {
Context {
request_id,
runtime_duration_ms,
init_duration_ms,
start_time,
enhanced_metric_data,
tracer_span,
}
}

#[must_use]
pub fn from_request_id(request_id: &str) -> Self {
Context::new(request_id.to_owned(), 0f64, 0f64, 0, None, None)
}
}

impl Default for Context {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we only need this for tests? if so can we wrap it in a test macro? Defaults can cause subtle bugs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do use it for logs processor, where we have default empty at the beginning all the time

fn default() -> Self {
Context {
request_id: String::new(),
runtime_duration_ms: 0f64,
init_duration_ms: 0f64,
start_time: 0,
enhanced_metric_data: None,
tracer_span: None,
}
}
}
Expand Down Expand Up @@ -95,8 +117,8 @@ impl ContextBuffer {

/// Creates a new `Context` and adds it to the buffer.
///
pub fn create_context(&mut self, request_id: String) {
self.insert(Context::new(request_id, 0f64, 0f64, 0, None));
pub fn create_context(&mut self, request_id: &str) {
self.insert(Context::from_request_id(request_id));
}

/// Adds the init duration to a `Context` in the buffer.
Expand Down Expand Up @@ -159,6 +181,20 @@ impl ContextBuffer {
}
}

/// Adds the tracer span to a `Context` in the buffer.
///
pub fn add_tracer_span(&mut self, request_id: &String, tracer_span: Option<Span>) {
if let Some(context) = self
.buffer
.iter_mut()
.find(|context| context.request_id == *request_id)
{
context.tracer_span = tracer_span;
} else {
debug!("Could not add tracer span - context not found");
}
}

/// Returns the size of the buffer.
///
#[must_use]
Expand Down Expand Up @@ -188,20 +224,20 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id_2);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);

// This should replace the first context
let request_id_3 = String::from("3");
let context = Context::new(request_id_3.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id_3);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_3).unwrap(), &context);
Expand All @@ -215,13 +251,13 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id_2);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);
Expand All @@ -242,13 +278,13 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id_2);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);
Expand All @@ -263,7 +299,7 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);
Expand All @@ -277,7 +313,7 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);
Expand All @@ -291,7 +327,7 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);
Expand All @@ -307,7 +343,7 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
let context = Context::from_request_id(&request_id);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);
Expand Down
47 changes: 33 additions & 14 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error";

pub struct Processor {
// Buffer containing context of the previous 5 invocations
pub context_buffer: ContextBuffer,
pub context_buffer: Arc<Mutex<ContextBuffer>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this public?
I tried to remove the pub and run the test and it still works.

Considering that the Processor is already guarded my an arc mutex, I am wondering why we are wrapping it in another mutex and expose it outside

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove the pub, and I'd like to remove the lock altogether, yet I cannot find a way to get the desired outcome without the lock.

The lifecycle processor is just another component which has access to the context buffer, which the trace processor should have access too. They both need access because if the tracer sends by any chance a top level span, we need to make sure to add it to the current context of the given request id, that way whenever there's a platform runtime done, we can attach the tracer top level span information onto the one we create. WDYT?

// Helper to infer span information
inferrer: SpanInferrer,
// Current invocation span
Expand Down Expand Up @@ -78,6 +78,7 @@ impl Processor {
config: Arc<config::Config>,
aws_config: &AwsConfig,
metrics_aggregator: Arc<Mutex<MetricsAggregator>>,
context_buffer: Arc<Mutex<ContextBuffer>>,
) -> Self {
let service = config.service.clone().unwrap_or(String::from("aws.lambda"));
let resource = tags_provider
Expand All @@ -87,7 +88,7 @@ impl Processor {
let propagator = DatadogCompositePropagator::new(Arc::clone(&config));

Processor {
context_buffer: ContextBuffer::default(),
context_buffer,
inferrer: SpanInferrer::new(config.service_mapping.clone()),
span: create_empty_span(String::from("aws.lambda"), resource, service),
cold_start_span: None,
Expand All @@ -107,7 +108,8 @@ impl Processor {
self.reset_state();
self.set_init_tags();

self.context_buffer.create_context(request_id.clone());
let mut context_buffer = self.context_buffer.lock().expect("lock poisoned");
context_buffer.create_context(&request_id);
if self.config.enhanced_metrics {
// Collect offsets for network and cpu metrics
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
Expand All @@ -130,9 +132,9 @@ impl Processor {
tmp_chan_tx,
process_chan_tx,
});
self.context_buffer
.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
context_buffer.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
}
drop(context_buffer);

// Increment the invocation metric
self.enhanced_metrics.increment_invocation_metric();
Expand Down Expand Up @@ -164,7 +166,8 @@ impl Processor {
let mut cold_start = false;

// If it's empty, then we are in a cold start
if self.context_buffer.is_empty() {
let context_buffer = self.context_buffer.lock().expect("lock poisoned");
if context_buffer.is_empty() {
let now = Instant::now();
let time_since_sandbox_init = now.duration_since(self.aws_config.sandbox_init_time);
if time_since_sandbox_init.as_millis() > PROACTIVE_INITIALIZATION_THRESHOLD_MS.into() {
Expand All @@ -178,6 +181,7 @@ impl Processor {
self.enhanced_metrics.set_runtime_tag(&runtime);
self.runtime = Some(runtime);
}
drop(context_buffer);

if proactive_initialization {
self.span.meta.insert(
Expand Down Expand Up @@ -244,12 +248,14 @@ impl Processor {
.as_nanos()
.try_into()
.unwrap_or_default();
self.context_buffer.add_start_time(&request_id, start_time);
let mut context_buffer = self.context_buffer.lock().expect("lock poisoned");
context_buffer.add_start_time(&request_id, start_time);
self.span.start = start_time;
}

#[allow(clippy::too_many_arguments)]
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::await_holding_lock)] // we are dropping the lock before awaiting
pub async fn on_platform_runtime_done(
&mut self,
request_id: &String,
Expand All @@ -260,8 +266,8 @@ impl Processor {
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_agent_tx: Sender<SendData>,
) {
self.context_buffer
.add_runtime_duration(request_id, duration_ms);
let mut context_buffer = self.context_buffer.lock().expect("lock poisoned");
context_buffer.add_runtime_duration(request_id, duration_ms);

// Set the runtime duration metric
self.enhanced_metrics
Expand Down Expand Up @@ -289,7 +295,7 @@ impl Processor {
}
}

if let Some(context) = self.context_buffer.get(request_id) {
if let Some(context) = context_buffer.get(request_id) {
// `round` is intentionally meant to be a whole integer
self.span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64;
self.span
Expand All @@ -299,6 +305,10 @@ impl Processor {
// - language
// - metrics tags (for asm)

if let Some(tracer_span) = &context.tracer_span {
self.span.meta.extend(tracer_span.meta.clone());
}

if let Some(offsets) = &context.enhanced_metric_data {
self.enhanced_metrics.set_cpu_utilization_enhanced_metrics(
offsets.cpu_offset.clone(),
Expand All @@ -310,6 +320,7 @@ impl Processor {
_ = offsets.process_chan_tx.send(());
}
}
drop(context_buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure we need the manual drop here? Doesn't the closure on 307 help us by 581? Is the drop necessary? If we add new code after, we have to re-lock right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so since we only await at the very end, I decided to lock in the last place we modify the context_buffer, in reality, I'm dropping to be 100% sure that nothing strange happens. I'll add another comment to note that we drop because we await later on 332

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a more idiomatic approach is to create

        let context_for_request= {
            let mut context_buffer = self.context_buffer.lock().expect("lock poisoned");
            context_buffer.add_runtime_duration(request_id, metrics.duration_ms);
            context_buffer.get(request_id)
        };

at line 278 and

        if let Some(context) = context_for_request{

at line 311.

This clearly drops the lock when it's not needed.
We have a number of useless drop() here and there in the code.

But as for previous comment, I would avoid the lock altogether

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, since I don't need to assign it to anything, I think I can refactor this by moving the first method to the bottom close to where we actually use the get(...) method and then use a scope to directly drop the lock 🤔


if let Some(trigger_tags) = self.inferrer.get_trigger_tags() {
self.span.meta.extend(trigger_tags);
Expand Down Expand Up @@ -355,13 +366,14 @@ impl Processor {
dropped_p0_spans: 0,
};

let send_data = trace_processor.process_traces(
let send_data: SendData = trace_processor.process_traces(
config.clone(),
tags_provider.clone(),
header_tags,
vec![traces],
body_size,
self.inferrer.span_pointers.clone(),
self.context_buffer.clone(),
);

if let Err(e) = trace_agent_tx.send(send_data).await {
Expand All @@ -380,7 +392,8 @@ impl Processor {
// Set the report log metrics
self.enhanced_metrics.set_report_log_metrics(&metrics);

if let Some(context) = self.context_buffer.get(request_id) {
let context_buffer = self.context_buffer.lock().expect("lock poisoned");
if let Some(context) = context_buffer.get(request_id) {
if context.runtime_duration_ms != 0.0 {
let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms;

Expand Down Expand Up @@ -544,7 +557,7 @@ impl Processor {
tags = DatadogHeaderPropagator::extract_tags(headers);
}

// We should always use the generated trace id from the tracer
// We should always use the generated span id from the tracer
if let Some(header) = headers.get(DATADOG_SPAN_ID_KEY) {
self.span.span_id = header.parse::<u64>().unwrap_or(0);
}
Expand Down Expand Up @@ -654,7 +667,13 @@ mod tests {
Aggregator::new(EMPTY_TAGS, 1024).expect("failed to create aggregator"),
));

Processor::new(tags_provider, config, &aws_config, metrics_aggregator)
Processor::new(
tags_provider,
config,
&aws_config,
metrics_aggregator,
Arc::new(Mutex::new(ContextBuffer::default())),
)
}

#[test]
Expand Down
Loading
Loading