Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ base64 = { version = "0.22", default-features = false }
rmp-serde = { version = "1.3.0", default-features = false }
rustls = { version = "0.23.18", default-features = false, features = ["aws-lc-rs"] }
rand = { version = "0.8", default-features = false }
prost = { version = "0.11.6", default-features = false }

[dev-dependencies]
figment = { version = "0.10", default-features = false, features = ["yaml", "env", "test"] }
Expand Down
121 changes: 72 additions & 49 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use bottlecap::{
listener::TelemetryListener,
},
traces::{
stats_aggregator::StatsAggregator,
stats_flusher::{self, StatsFlusher},
stats_processor, trace_agent,
stats_processor, trace_agent, trace_aggregator,
trace_flusher::{self, TraceFlusher},
trace_processor,
},
Expand Down Expand Up @@ -297,55 +298,16 @@ async fn extension_loop_active(
};
let mut metrics_flusher = MetricsFlusher::new(flusher_config);

let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
buffer: Arc::new(TokioMutex::new(Vec::new())),
config: Arc::clone(config),
});

// Lifecycle Invocation Processor
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Arc::clone(config),
aws_config,
Arc::clone(&metrics_aggr),
)));
let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(
obfuscation_config::ObfuscationConfig::new()
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?,
),
resolved_api_key: resolved_api_key.clone(),
});

let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {
buffer: Arc::new(TokioMutex::new(Vec::new())),
config: Arc::clone(config),
resolved_api_key: resolved_api_key.clone(),
});
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

let trace_flusher_clone = trace_flusher.clone();
let stats_flusher_clone = stats_flusher.clone();

let trace_agent = Box::new(
trace_agent::TraceAgent::new(
Arc::clone(config),
trace_processor.clone(),
trace_flusher_clone,
stats_processor,
stats_flusher_clone,
Arc::clone(&tags_provider),
)
.await,
);
let trace_agent_tx = trace_agent.get_sender_copy();

tokio::spawn(async move {
let res = trace_agent.start().await;
if let Err(e) = res {
error!("Error starting trace agent: {e:?}");
}
});
let (trace_agent_channel, trace_flusher, trace_processor, stats_flusher) =
start_trace_agent(config, resolved_api_key.clone(), &tags_provider);

let lifecycle_listener = LifecycleListener {
invocation_processor: Arc::clone(&invocation_processor),
Expand Down Expand Up @@ -456,7 +418,7 @@ async fn extension_loop_active(
config.clone(),
tags_provider.clone(),
trace_processor.clone(),
trace_agent_tx.clone()
trace_agent_channel.clone()
).await;
}
drop(p);
Expand All @@ -471,8 +433,8 @@ async fn extension_loop_active(
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
trace_flusher.flush(),
stats_flusher.flush()
);
}

Expand Down Expand Up @@ -507,8 +469,8 @@ async fn extension_loop_active(
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
trace_flusher.flush(),
stats_flusher.flush()
);
if matches!(flush_control.flush_strategy, FlushStrategy::Periodically(_)) {
break;
Expand All @@ -523,8 +485,8 @@ async fn extension_loop_active(
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
trace_flusher.flush(),
stats_flusher.flush()
);
return Ok(());
}
Expand Down Expand Up @@ -569,6 +531,67 @@ fn start_logs_agent(
(logs_agent_channel, logs_flusher)
}

fn start_trace_agent(
config: &Arc<Config>,
resolved_api_key: String,
tags_provider: &Arc<TagProvider>,
) -> (
Sender<datadog_trace_utils::send_data::SendData>,
Arc<trace_flusher::ServerlessTraceFlusher>,
Arc<trace_processor::ServerlessTraceProcessor>,
Arc<stats_flusher::ServerlessStatsFlusher>,
) {
// Stats
let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default()));
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
resolved_api_key.clone(),
stats_aggregator.clone(),
Arc::clone(config),
));

let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

// Traces
let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default()));
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
aggregator: trace_aggregator.clone(),
config: Arc::clone(config),
});

let obfuscation_config = obfuscation_config::ObfuscationConfig::new()
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))
.expect("Failed to create obfuscation config for Trace Agent");

let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(obfuscation_config),
resolved_api_key,
});

let trace_agent = Box::new(trace_agent::TraceAgent::new(
Arc::clone(config),
trace_aggregator,
trace_processor.clone(),
stats_aggregator,
stats_processor,
Arc::clone(tags_provider),
));
let trace_agent_channel = trace_agent.get_sender_copy();

tokio::spawn(async move {
let res = trace_agent.start().await;
if let Err(e) = res {
error!("Error starting trace agent: {e:?}");
}
});

(
trace_agent_channel,
trace_flusher,
trace_processor,
stats_flusher,
)
}

async fn start_dogstatsd(metrics_aggr: &Arc<Mutex<MetricsAggregator>>) -> CancellationToken {
let dogstatsd_config = DogStatsDConfig {
host: EXTENSION_HOST.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion bottlecap/src/logs/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Aggregator {
max_batch_entries_size,
max_content_size_bytes,
max_log_size_bytes,
buffer: Vec::with_capacity(max_batch_entries_size),
buffer: Vec::with_capacity(max_content_size_bytes),
}
}

Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
pub mod context;
pub mod propagation;
pub mod span_pointers;
pub mod stats_aggregator;
pub mod stats_flusher;
pub mod stats_processor;
pub mod trace_agent;
pub mod trace_aggregator;
pub mod trace_flusher;
pub mod trace_processor;

Expand Down
167 changes: 167 additions & 0 deletions bottlecap/src/traces/stats_aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use datadog_trace_protobuf::pb::ClientStatsPayload;
use prost::Message;
use std::collections::VecDeque;

/// Maximum number of entries in a stat payload.
///
/// <https://github.com/DataDog/datadog-agent/blob/996dd54337908a6511948fabd2a41420ba919a8b/pkg/trace/writer/stats.go#L35-L41>
// const MAX_BATCH_ENTRIES_SIZE: usize = 4000;

/// Aproximate size an entry in a stat payload occupies
///
/// <https://github.com/DataDog/datadog-agent/blob/996dd54337908a6511948fabd2a41420ba919a8b/pkg/trace/writer/stats.go#L33-L35>
// const MAX_ENTRY_SIZE_BYTES: usize = 375;

/// Maximum content size per payload in compressed bytes,
///
/// <https://github.com/DataDog/datadog-agent/blob/996dd54337908a6511948fabd2a41420ba919a8b/pkg/trace/writer/stats.go#L35-L41>
const MAX_CONTENT_SIZE_BYTES: usize = 3 * 1024 * 1024; // ~3MB

#[allow(clippy::module_name_repetitions)]
pub struct StatsAggregator {
queue: VecDeque<ClientStatsPayload>,
max_content_size_bytes: usize,
buffer: Vec<ClientStatsPayload>,
}

impl Default for StatsAggregator {
fn default() -> Self {
StatsAggregator {
queue: VecDeque::new(),
max_content_size_bytes: MAX_CONTENT_SIZE_BYTES,
buffer: Vec::with_capacity(MAX_CONTENT_SIZE_BYTES),
}
}
}

impl StatsAggregator {
#[allow(dead_code)]
#[allow(clippy::must_use_candidate)]
pub fn new(max_content_size_bytes: usize) -> Self {
StatsAggregator {
queue: VecDeque::new(),
max_content_size_bytes,
buffer: Vec::with_capacity(max_content_size_bytes),
}
}

pub fn add(&mut self, payload: ClientStatsPayload) {
self.queue.push_back(payload);
}

pub fn get_batch(&mut self) -> Vec<ClientStatsPayload> {
let mut batch_size = 0;

// Fill the batch
while batch_size < self.max_content_size_bytes {
if let Some(payload) = self.queue.pop_front() {
let payload_size = payload.encoded_len();

// Put stats back in the queue
if batch_size + payload_size > self.max_content_size_bytes {
self.queue.push_front(payload);
break;
}
batch_size += payload_size;
self.buffer.push(payload);
} else {
break;
}
}

std::mem::take(&mut self.buffer)
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;

#[test]
fn test_add() {
let mut aggregator = StatsAggregator::default();
let payload = ClientStatsPayload {
hostname: "hostname".to_string(),
env: "dev".to_string(),
version: "version".to_string(),
stats: vec![],
lang: "rust".to_string(),
tracer_version: "tracer.version".to_string(),
runtime_id: "hash".to_string(),
sequence: 0,
agent_aggregation: "aggregation".to_string(),
service: "service".to_string(),
container_id: "container_id".to_string(),
tags: vec![],
git_commit_sha: "git_commit_sha".to_string(),
image_tag: "image_tag".to_string(),
};

aggregator.add(payload.clone());
assert_eq!(aggregator.queue.len(), 1);
assert_eq!(aggregator.queue[0], payload);
}

#[test]
fn test_get_batch() {
let mut aggregator = StatsAggregator::default();
let payload = ClientStatsPayload {
hostname: "hostname".to_string(),
env: "dev".to_string(),
version: "version".to_string(),
stats: vec![],
lang: "rust".to_string(),
tracer_version: "tracer.version".to_string(),
runtime_id: "hash".to_string(),
sequence: 0,
agent_aggregation: "aggregation".to_string(),
service: "service".to_string(),
container_id: "container_id".to_string(),
tags: vec![],
git_commit_sha: "git_commit_sha".to_string(),
image_tag: "image_tag".to_string(),
};
aggregator.add(payload.clone());
assert_eq!(aggregator.queue.len(), 1);
let batch = aggregator.get_batch();
assert_eq!(batch, vec![payload]);
}

#[test]
fn test_get_batch_full_entries() {
let mut aggregator = StatsAggregator::new(230);
// Payload below is 115 bytes
let payload = ClientStatsPayload {
hostname: "hostname".to_string(),
env: "dev".to_string(),
version: "version".to_string(),
stats: vec![],
lang: "rust".to_string(),
tracer_version: "tracer.version".to_string(),
runtime_id: "hash".to_string(),
sequence: 0,
agent_aggregation: "aggregation".to_string(),
service: "service".to_string(),
container_id: "container_id".to_string(),
tags: vec![],
git_commit_sha: "git_commit_sha".to_string(),
image_tag: "image_tag".to_string(),
};

// Add 3 payloads
aggregator.add(payload.clone());
aggregator.add(payload.clone());
aggregator.add(payload.clone());

// The batch should only contain the first 2 payloads
let first_batch = aggregator.get_batch();
assert_eq!(first_batch, vec![payload.clone(), payload.clone()]);
assert_eq!(aggregator.queue.len(), 1);

// The second batch should only contain the last log
let second_batch = aggregator.get_batch();
assert_eq!(second_batch, vec![payload]);
assert_eq!(aggregator.queue.len(), 0);
}
}
Loading
Loading