Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct Config {
pub logs_config_compression_level: i32,
pub serverless_flush_strategy: FlushStrategy,
pub enhanced_metrics: bool,
//flush timeout in seconds
pub flush_timeout: u64, //TODO go agent adds jitter too
pub https_proxy: Option<String>,
pub capture_lambda_payload: bool,
Expand Down
20 changes: 9 additions & 11 deletions bottlecap/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ use core::time::Duration;
use std::sync::Arc;
use tracing::error;

#[must_use]
pub fn get_client(config: Arc<config::Config>) -> reqwest::Client {
match build_client(config) {
Ok(client) => client,
Err(e) => {
error!(
"Unable to parse proxy configuration: {}, no proxy will be used",
e
);
//TODO this fallback doesn't respect the flush timeout
reqwest::Client::new()
}
}
build_client(config).unwrap_or_else(|e| {
error!(
"Unable to parse proxy configuration: {}, no proxy will be used",
e
);
//TODO this fallback doesn't respect the flush timeout
reqwest::Client::new()
})
}

fn build_client(config: Arc<config::Config>) -> Result<reqwest::Client, reqwest::Error> {
Expand Down
19 changes: 15 additions & 4 deletions bottlecap/src/logs/flusher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::config;
use crate::http_client;
use crate::logs::aggregator::Aggregator;
use std::time::Instant;
use std::{
error::Error,
io::Write,
Expand Down Expand Up @@ -85,13 +86,13 @@ impl Flusher {
compression_enabled: bool,
compression_level: i32,
) {
let url = format!("{fqdn}/api/v2/logs");
if !data.is_empty() {
let url = format!("{fqdn}/api/v2/logs");
let start = Instant::now();
let body = if compression_enabled {
let result = (|| -> Result<Vec<u8>, Box<dyn Error>> {
let mut encoder = Encoder::new(Vec::new(), compression_level)
.map_err(|e| Box::new(e) as Box<dyn Error>)?;

encoder
.write_all(&data)
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
Expand Down Expand Up @@ -120,14 +121,24 @@ impl Flusher {
};
let resp: Result<reqwest::Response, reqwest::Error> = req.body(body).send().await;

let elapsed = start.elapsed();

match resp {
Ok(resp) => {
if resp.status() != 202 {
debug!("Failed to send logs to datadog: {}", resp.status());
debug!(
"Failed to send logs to datadog after {}ms: {}",
elapsed.as_millis(),
resp.status()
);
}
}
Err(e) => {
error!("Failed to send logs to datadog: {}", e);
error!(
"Failed to send logs to datadog after {}ms: {}",
elapsed.as_millis(),
e
);
}
}
}
Expand Down
31 changes: 16 additions & 15 deletions bottlecap/src/tags/lambda/tags.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config;
use std::collections::hash_map;
use std::collections::HashMap;
use std::env::consts::ARCH;
use std::fs;
use std::sync::Arc;
Expand Down Expand Up @@ -66,7 +66,7 @@ const RESOURCE_KEY: &str = "resource";

#[derive(Debug, Clone)]
pub struct Lambda {
tags_map: hash_map::HashMap<String, String>,
tags_map: HashMap<String, String>,
}

fn arch_to_platform<'a>() -> &'a str {
Expand All @@ -77,10 +77,10 @@ fn arch_to_platform<'a>() -> &'a str {
}

fn tags_from_env(
mut tags_map: hash_map::HashMap<String, String>,
mut tags_map: HashMap<String, String>,
config: Arc<config::Config>,
metadata: &hash_map::HashMap<String, String>,
) -> hash_map::HashMap<String, String> {
metadata: &HashMap<String, String>,
) -> HashMap<String, String> {
if metadata.contains_key(FUNCTION_ARN_KEY) {
let parts = metadata[FUNCTION_ARN_KEY].split(':').collect::<Vec<&str>>();
if parts.len() > 6 {
Expand Down Expand Up @@ -222,10 +222,10 @@ impl Lambda {
#[must_use]
pub fn new_from_config(
config: Arc<config::Config>,
metadata: &hash_map::HashMap<String, String>,
metadata: &HashMap<String, String>,
) -> Self {
Lambda {
tags_map: tags_from_env(hash_map::HashMap::new(), config, metadata),
tags_map: tags_from_env(HashMap::new(), config, metadata),
}
}

Expand All @@ -248,19 +248,19 @@ impl Lambda {
}

#[must_use]
pub fn get_tags_map(&self) -> &hash_map::HashMap<String, String> {
pub fn get_tags_map(&self) -> &HashMap<String, String> {
&self.tags_map
}

#[must_use]
pub fn get_function_tags_map(&self) -> hash_map::HashMap<String, String> {
pub fn get_function_tags_map(&self) -> HashMap<String, String> {
let tags = self
.tags_map
.iter()
.map(|(k, v)| format!("{k}:{v}"))
.collect::<Vec<String>>()
.join(",");
hash_map::HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)])
HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)])
}
}

Expand All @@ -270,13 +270,14 @@ mod tests {
use super::*;
use crate::config::Config;
use serial_test::serial;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::path::Path;

#[test]
fn test_new_from_config() {
let metadata = hash_map::HashMap::new();
let metadata = HashMap::new();
let tags = Lambda::new_from_config(Arc::new(Config::default()), &metadata);
assert_eq!(tags.tags_map.len(), 3);
assert_eq!(
Expand All @@ -297,7 +298,7 @@ mod tests {

#[test]
fn test_new_with_function_arn_metadata() {
let mut metadata = hash_map::HashMap::new();
let mut metadata = HashMap::new();
metadata.insert(
FUNCTION_ARN_KEY.to_string(),
"arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(),
Expand All @@ -317,7 +318,7 @@ mod tests {
#[test]
#[serial] //run test serially since it sets and unsets env vars
fn test_with_lambda_env_vars() {
let mut metadata = hash_map::HashMap::new();
let mut metadata = HashMap::new();
metadata.insert(
FUNCTION_ARN_KEY.to_string(),
"arn:aws:lambda:us-west-2:123456789012:function:My-function".to_string(),
Expand Down Expand Up @@ -387,7 +388,7 @@ mod tests {

#[test]
fn test_get_function_tags_map() {
let mut metadata = hash_map::HashMap::new();
let mut metadata = HashMap::new();
metadata.insert(
FUNCTION_ARN_KEY.to_string(),
"arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(),
Expand All @@ -402,7 +403,7 @@ mod tests {
let tags = Lambda::new_from_config(config, &metadata);
let function_tags = tags.get_function_tags_map();
assert_eq!(function_tags.len(), 1);
let fn_tags_map: hash_map::HashMap<String, String> = function_tags
let fn_tags_map: HashMap<String, String> = function_tags
.get(FUNCTION_TAGS_KEY)
.unwrap()
.split(',')
Expand Down
40 changes: 25 additions & 15 deletions bottlecap/src/traces/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use async_trait::async_trait;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error};

use crate::config;
use crate::traces::stats_aggregator::StatsAggregator;
use datadog_trace_protobuf::pb;
use datadog_trace_utils::{config_utils::trace_stats_url, stats_utils};
use ddcommon::Endpoint;
use tracing::{debug, error};

#[async_trait]
pub trait StatsFlusher {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl StatsFlusher for ServerlessStatsFlusher {
let endpoint = Endpoint {
url: hyper::Uri::from_str(&stats_url).expect("can't make URI from stats url, exiting"),
api_key: Some(api_key.clone().into()),
timeout_ms: Endpoint::DEFAULT_TIMEOUT,
timeout_ms: config.flush_timeout * 1_000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if flush_timeout is not set? Do we have a default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the DD_FLUSH_TIMEOUT or flush_timeout option in the datadog agent, where does this come from?

test_token: None,
};

Expand All @@ -60,16 +60,6 @@ impl StatsFlusher for ServerlessStatsFlusher {
}
}

async fn flush(&self) {
let mut guard = self.aggregator.lock().await;

let mut stats = guard.get_batch();
while !stats.is_empty() {
self.send(stats).await;

stats = guard.get_batch();
}
}
async fn send(&self, stats: Vec<pb::ClientStatsPayload>) {
if stats.is_empty() {
return;
Expand All @@ -88,17 +78,37 @@ impl StatsFlusher for ServerlessStatsFlusher {
}
};

match stats_utils::send_stats_payload(
let stats_url = trace_stats_url(&self.config.site);

let start = std::time::Instant::now();

let resp = stats_utils::send_stats_payload(
serialized_stats_payload,
&self.endpoint,
&self.config.api_key,
)
.await
{
.await;
let elapsed = start.elapsed();
debug!(
"Stats request to {} took {}ms",
stats_url,
elapsed.as_millis()
);
match resp {
Ok(()) => debug!("Successfully flushed stats"),
Err(e) => {
error!("Error sending stats: {e:?}");
}
};
}
async fn flush(&self) {
let mut guard = self.aggregator.lock().await;

let mut stats = guard.get_batch();
while !stats.is_empty() {
self.send(stats).await;

stats = guard.get_batch();
}
}
}
3 changes: 3 additions & 0 deletions bottlecap/src/traces/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ impl TraceFlusher for ServerlessTraceFlusher {
if traces.is_empty() {
return;
}

let start = std::time::Instant::now();
debug!("Flushing {} traces", traces.len());

for traces in trace_utils::coalesce_send_data(traces) {
Expand All @@ -65,5 +67,6 @@ impl TraceFlusher for ServerlessTraceFlusher {
}
}
}
debug!("Flushing traces took {}ms", start.elapsed().as_millis());
}
}
32 changes: 20 additions & 12 deletions bottlecap/src/traces/trace_processor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::config;
use crate::tags::provider;
use crate::traces::span_pointers::{attach_span_pointers_to_meta, SpanPointer};
use crate::traces::{
AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX, DNS_LOCAL_HOST_ADDRESS_URL_PREFIX,
DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX,
LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX,
};
use datadog_trace_obfuscation::obfuscate::obfuscate_span;
use datadog_trace_obfuscation::obfuscation_config;
use datadog_trace_protobuf::pb;
use datadog_trace_protobuf::pb::Span;
use datadog_trace_utils::config_utils::trace_intake_url;
use datadog_trace_utils::send_data::{RetryBackoffType, RetryStrategy};
use datadog_trace_utils::trace_utils::SendData;
use datadog_trace_utils::trace_utils::{self};
use datadog_trace_utils::tracer_header_tags;
use datadog_trace_utils::tracer_payload::{
TraceChunkProcessor, TraceCollection::V07, TracerPayloadCollection,
Expand All @@ -14,16 +25,6 @@ use ddcommon::Endpoint;
use std::str::FromStr;
use std::sync::Arc;

use crate::config;
use crate::traces::{
AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX, DNS_LOCAL_HOST_ADDRESS_URL_PREFIX,
DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX,
LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX,
};
use datadog_trace_obfuscation::obfuscate::obfuscate_span;
use datadog_trace_protobuf::pb::Span;
use datadog_trace_utils::trace_utils::{self, SendData};

#[derive(Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct ServerlessTraceProcessor {
Expand Down Expand Up @@ -161,11 +162,18 @@ impl TraceProcessor for ServerlessTraceProcessor {
let endpoint = Endpoint {
url: hyper::Uri::from_str(&intake_url).expect("can't parse trace intake URL, exiting"),
api_key: Some(self.resolved_api_key.clone().into()),
timeout_ms: Endpoint::DEFAULT_TIMEOUT,
timeout_ms: config.flush_timeout * 1_000,
test_token: None,
};

SendData::new(body_size, payload, header_tags, &endpoint)
let mut send_data = SendData::new(body_size, payload, header_tags, &endpoint);
send_data.set_retry_strategy(RetryStrategy::new(
1,
100,
Comment on lines +171 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic numbers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you just create it once and clone it another place?

RetryBackoffType::Exponential,
None,
));
send_data
}
}

Expand Down
Loading