Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct Config {
pub logs_config_compression_level: i32,
pub serverless_flush_strategy: FlushStrategy,
pub enhanced_metrics: bool,
//flush timeout in seconds
pub flush_timeout: u64, //TODO go agent adds jitter too
pub https_proxy: Option<String>,
pub capture_lambda_payload: bool,
Expand Down
20 changes: 9 additions & 11 deletions bottlecap/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ use core::time::Duration;
use std::sync::Arc;
use tracing::error;

#[must_use]
pub fn get_client(config: Arc<config::Config>) -> reqwest::Client {
match build_client(config) {
Ok(client) => client,
Err(e) => {
error!(
"Unable to parse proxy configuration: {}, no proxy will be used",
e
);
//TODO this fallback doesn't respect the flush timeout
reqwest::Client::new()
}
}
build_client(config).unwrap_or_else(|e| {
error!(
"Unable to parse proxy configuration: {}, no proxy will be used",
e
);
//TODO this fallback doesn't respect the flush timeout
reqwest::Client::new()
})
}

fn build_client(config: Arc<config::Config>) -> Result<reqwest::Client, reqwest::Error> {
Expand Down
20 changes: 16 additions & 4 deletions bottlecap/src/logs/flusher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::config;
use crate::http_client;
use crate::logs::aggregator::Aggregator;
use std::time::Instant;
use std::{
error::Error,
io::Write,
Expand Down Expand Up @@ -85,13 +86,14 @@ impl Flusher {
compression_enabled: bool,
compression_level: i32,
) {
let url = format!("{fqdn}/api/v2/logs");
if !data.is_empty() {
let url = format!("{fqdn}/api/v2/logs");
debug!("Sending logs to datadog");
let start = Instant::now();
let body = if compression_enabled {
let result = (|| -> Result<Vec<u8>, Box<dyn Error>> {
let mut encoder = Encoder::new(Vec::new(), compression_level)
.map_err(|e| Box::new(e) as Box<dyn Error>)?;

encoder
.write_all(&data)
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
Expand Down Expand Up @@ -120,14 +122,24 @@ impl Flusher {
};
let resp: Result<reqwest::Response, reqwest::Error> = req.body(body).send().await;

let elapsed = start.elapsed();

match resp {
Ok(resp) => {
if resp.status() != 202 {
debug!("Failed to send logs to datadog: {}", resp.status());
debug!(
"Failed to send logs to datadog after {}ms: {}",
elapsed.as_millis(),
resp.status()
);
}
}
Err(e) => {
error!("Failed to send logs to datadog: {}", e);
error!(
"Failed to send logs to datadog after {}ms: {}",
elapsed.as_millis(),
e
);
}
}
}
Expand Down
31 changes: 16 additions & 15 deletions bottlecap/src/tags/lambda/tags.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config;
use std::collections::hash_map;
use std::collections::HashMap;
use std::env::consts::ARCH;
use std::fs;
use std::sync::Arc;
Expand Down Expand Up @@ -66,7 +66,7 @@ const RESOURCE_KEY: &str = "resource";

#[derive(Debug, Clone)]
pub struct Lambda {
tags_map: hash_map::HashMap<String, String>,
tags_map: HashMap<String, String>,
}

fn arch_to_platform<'a>() -> &'a str {
Expand All @@ -77,10 +77,10 @@ fn arch_to_platform<'a>() -> &'a str {
}

fn tags_from_env(
mut tags_map: hash_map::HashMap<String, String>,
mut tags_map: HashMap<String, String>,
config: Arc<config::Config>,
metadata: &hash_map::HashMap<String, String>,
) -> hash_map::HashMap<String, String> {
metadata: &HashMap<String, String>,
) -> HashMap<String, String> {
if metadata.contains_key(FUNCTION_ARN_KEY) {
let parts = metadata[FUNCTION_ARN_KEY].split(':').collect::<Vec<&str>>();
if parts.len() > 6 {
Expand Down Expand Up @@ -222,10 +222,10 @@ impl Lambda {
#[must_use]
pub fn new_from_config(
config: Arc<config::Config>,
metadata: &hash_map::HashMap<String, String>,
metadata: &HashMap<String, String>,
) -> Self {
Lambda {
tags_map: tags_from_env(hash_map::HashMap::new(), config, metadata),
tags_map: tags_from_env(HashMap::new(), config, metadata),
}
}

Expand All @@ -248,19 +248,19 @@ impl Lambda {
}

#[must_use]
pub fn get_tags_map(&self) -> &hash_map::HashMap<String, String> {
pub fn get_tags_map(&self) -> &HashMap<String, String> {
&self.tags_map
}

#[must_use]
pub fn get_function_tags_map(&self) -> hash_map::HashMap<String, String> {
pub fn get_function_tags_map(&self) -> HashMap<String, String> {
let tags = self
.tags_map
.iter()
.map(|(k, v)| format!("{k}:{v}"))
.collect::<Vec<String>>()
.join(",");
hash_map::HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)])
HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)])
}
}

Expand All @@ -270,13 +270,14 @@ mod tests {
use super::*;
use crate::config::Config;
use serial_test::serial;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::path::Path;

#[test]
fn test_new_from_config() {
let metadata = hash_map::HashMap::new();
let metadata = HashMap::new();
let tags = Lambda::new_from_config(Arc::new(Config::default()), &metadata);
assert_eq!(tags.tags_map.len(), 3);
assert_eq!(
Expand All @@ -297,7 +298,7 @@ mod tests {

#[test]
fn test_new_with_function_arn_metadata() {
let mut metadata = hash_map::HashMap::new();
let mut metadata = HashMap::new();
metadata.insert(
FUNCTION_ARN_KEY.to_string(),
"arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(),
Expand All @@ -317,7 +318,7 @@ mod tests {
#[test]
#[serial] //run test serially since it sets and unsets env vars
fn test_with_lambda_env_vars() {
let mut metadata = hash_map::HashMap::new();
let mut metadata = HashMap::new();
metadata.insert(
FUNCTION_ARN_KEY.to_string(),
"arn:aws:lambda:us-west-2:123456789012:function:My-function".to_string(),
Expand Down Expand Up @@ -387,7 +388,7 @@ mod tests {

#[test]
fn test_get_function_tags_map() {
let mut metadata = hash_map::HashMap::new();
let mut metadata = HashMap::new();
metadata.insert(
FUNCTION_ARN_KEY.to_string(),
"arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(),
Expand All @@ -402,7 +403,7 @@ mod tests {
let tags = Lambda::new_from_config(config, &metadata);
let function_tags = tags.get_function_tags_map();
assert_eq!(function_tags.len(), 1);
let fn_tags_map: hash_map::HashMap<String, String> = function_tags
let fn_tags_map: HashMap<String, String> = function_tags
.get(FUNCTION_TAGS_KEY)
.unwrap()
.split(',')
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/traces/span_pointers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ mod tests {
expected_links: Option<serde_json::Value>,
}

#[allow(clippy::too_many_lines)]
#[test]
#[allow(clippy::too_many_lines)]
fn test_attach_span_pointers_to_span() {
Expand Down
40 changes: 25 additions & 15 deletions bottlecap/src/traces/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use async_trait::async_trait;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error};

use crate::config;
use crate::traces::stats_aggregator::StatsAggregator;
use datadog_trace_protobuf::pb;
use datadog_trace_utils::{config_utils::trace_stats_url, stats_utils};
use ddcommon::Endpoint;
use tracing::{debug, error};

#[async_trait]
pub trait StatsFlusher {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl StatsFlusher for ServerlessStatsFlusher {
let endpoint = Endpoint {
url: hyper::Uri::from_str(&stats_url).expect("can't make URI from stats url, exiting"),
api_key: Some(api_key.clone().into()),
timeout_ms: Endpoint::DEFAULT_TIMEOUT,
timeout_ms: config.flush_timeout * 1_000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if flush_timeout is not set? Do we have a default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the DD_FLUSH_TIMEOUT or flush_timeout option in the datadog agent, where does this come from?

test_token: None,
};

Expand All @@ -60,16 +60,6 @@ impl StatsFlusher for ServerlessStatsFlusher {
}
}

async fn flush(&self) {
let mut guard = self.aggregator.lock().await;

let mut stats = guard.get_batch();
while !stats.is_empty() {
self.send(stats).await;

stats = guard.get_batch();
}
}
async fn send(&self, stats: Vec<pb::ClientStatsPayload>) {
if stats.is_empty() {
return;
Expand All @@ -88,17 +78,37 @@ impl StatsFlusher for ServerlessStatsFlusher {
}
};

match stats_utils::send_stats_payload(
let stats_url = trace_stats_url(&self.config.site);

let start = std::time::Instant::now();

let resp = stats_utils::send_stats_payload(
serialized_stats_payload,
&self.endpoint,
&self.config.api_key,
)
.await
{
.await;
let elapsed = start.elapsed();
debug!(
"Stats request to {} took {}ms",
stats_url,
elapsed.as_millis()
);
match resp {
Ok(()) => debug!("Successfully flushed stats"),
Err(e) => {
error!("Error sending stats: {e:?}");
}
};
}
async fn flush(&self) {
let mut guard = self.aggregator.lock().await;

let mut stats = guard.get_batch();
while !stats.is_empty() {
self.send(stats).await;

stats = guard.get_batch();
}
}
}
3 changes: 3 additions & 0 deletions bottlecap/src/traces/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ impl TraceFlusher for ServerlessTraceFlusher {
if traces.is_empty() {
return;
}

let start = std::time::Instant::now();
debug!("Flushing {} traces", traces.len());

for traces in trace_utils::coalesce_send_data(traces) {
Expand All @@ -65,5 +67,6 @@ impl TraceFlusher for ServerlessTraceFlusher {
}
}
}
debug!("Flushing traces took {}ms", start.elapsed().as_millis());
}
}
32 changes: 20 additions & 12 deletions bottlecap/src/traces/trace_processor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::config;
use crate::tags::provider;
use crate::traces::span_pointers::{attach_span_pointers_to_meta, SpanPointer};
use crate::traces::{
AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX, DNS_LOCAL_HOST_ADDRESS_URL_PREFIX,
DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX,
LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX,
};
use datadog_trace_obfuscation::obfuscate::obfuscate_span;
use datadog_trace_obfuscation::obfuscation_config;
use datadog_trace_protobuf::pb;
use datadog_trace_protobuf::pb::Span;
use datadog_trace_utils::config_utils::trace_intake_url;
use datadog_trace_utils::send_data::{RetryBackoffType, RetryStrategy};
use datadog_trace_utils::trace_utils::SendData;
use datadog_trace_utils::trace_utils::{self};
use datadog_trace_utils::tracer_header_tags;
use datadog_trace_utils::tracer_payload::{
TraceChunkProcessor, TraceCollection::V07, TracerPayloadCollection,
Expand All @@ -14,16 +25,6 @@ use ddcommon::Endpoint;
use std::str::FromStr;
use std::sync::Arc;

use crate::config;
use crate::traces::{
AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX, DNS_LOCAL_HOST_ADDRESS_URL_PREFIX,
DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX,
LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX,
};
use datadog_trace_obfuscation::obfuscate::obfuscate_span;
use datadog_trace_protobuf::pb::Span;
use datadog_trace_utils::trace_utils::{self, SendData};

#[derive(Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct ServerlessTraceProcessor {
Expand Down Expand Up @@ -161,11 +162,18 @@ impl TraceProcessor for ServerlessTraceProcessor {
let endpoint = Endpoint {
url: hyper::Uri::from_str(&intake_url).expect("can't parse trace intake URL, exiting"),
api_key: Some(self.resolved_api_key.clone().into()),
timeout_ms: Endpoint::DEFAULT_TIMEOUT,
timeout_ms: config.flush_timeout * 1_000,
test_token: None,
};

SendData::new(body_size, payload, header_tags, &endpoint)
let mut send_data = SendData::new(body_size, payload, header_tags, &endpoint);
send_data.set_retry_strategy(RetryStrategy::new(
1,
100,
Comment on lines +171 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic numbers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you just create it once and clone it another place?

RetryBackoffType::Exponential,
None,
));
send_data
}
}

Expand Down
Loading