Skip to content

Commit 3fbeca4

Browse files
authored
Svls 6036 respect timeouts (#537)
* log shipping times * set flush timeout for traces * remove retries * fix conflicts * address comments
1 parent 496845d commit 3fbeca4

File tree

7 files changed

+89
-57
lines changed

7 files changed

+89
-57
lines changed

bottlecap/src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub struct Config {
104104
pub logs_config_compression_level: i32,
105105
pub serverless_flush_strategy: FlushStrategy,
106106
pub enhanced_metrics: bool,
107+
//flush timeout in seconds
107108
pub flush_timeout: u64, //TODO go agent adds jitter too
108109
pub https_proxy: Option<String>,
109110
pub capture_lambda_payload: bool,

bottlecap/src/http_client.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,16 @@ use core::time::Duration;
33
use std::sync::Arc;
44
use tracing::error;
55

6+
#[must_use]
67
pub fn get_client(config: Arc<config::Config>) -> reqwest::Client {
7-
match build_client(config) {
8-
Ok(client) => client,
9-
Err(e) => {
10-
error!(
11-
"Unable to parse proxy configuration: {}, no proxy will be used",
12-
e
13-
);
14-
//TODO this fallback doesn't respect the flush timeout
15-
reqwest::Client::new()
16-
}
17-
}
8+
build_client(config).unwrap_or_else(|e| {
9+
error!(
10+
"Unable to parse proxy configuration: {}, no proxy will be used",
11+
e
12+
);
13+
//TODO this fallback doesn't respect the flush timeout
14+
reqwest::Client::new()
15+
})
1816
}
1917

2018
fn build_client(config: Arc<config::Config>) -> Result<reqwest::Client, reqwest::Error> {

bottlecap/src/logs/flusher.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::config;
22
use crate::http_client;
33
use crate::logs::aggregator::Aggregator;
4+
use std::time::Instant;
45
use std::{
56
error::Error,
67
io::Write,
@@ -85,13 +86,13 @@ impl Flusher {
8586
compression_enabled: bool,
8687
compression_level: i32,
8788
) {
88-
let url = format!("{fqdn}/api/v2/logs");
8989
if !data.is_empty() {
90+
let url = format!("{fqdn}/api/v2/logs");
91+
let start = Instant::now();
9092
let body = if compression_enabled {
9193
let result = (|| -> Result<Vec<u8>, Box<dyn Error>> {
9294
let mut encoder = Encoder::new(Vec::new(), compression_level)
9395
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
94-
9596
encoder
9697
.write_all(&data)
9798
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
@@ -120,14 +121,24 @@ impl Flusher {
120121
};
121122
let resp: Result<reqwest::Response, reqwest::Error> = req.body(body).send().await;
122123

124+
let elapsed = start.elapsed();
125+
123126
match resp {
124127
Ok(resp) => {
125128
if resp.status() != 202 {
126-
debug!("Failed to send logs to datadog: {}", resp.status());
129+
debug!(
130+
"Failed to send logs to datadog after {}ms: {}",
131+
elapsed.as_millis(),
132+
resp.status()
133+
);
127134
}
128135
}
129136
Err(e) => {
130-
error!("Failed to send logs to datadog: {}", e);
137+
error!(
138+
"Failed to send logs to datadog after {}ms: {}",
139+
elapsed.as_millis(),
140+
e
141+
);
131142
}
132143
}
133144
}

bottlecap/src/tags/lambda/tags.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::config;
2-
use std::collections::hash_map;
2+
use std::collections::HashMap;
33
use std::env::consts::ARCH;
44
use std::fs;
55
use std::sync::Arc;
@@ -66,7 +66,7 @@ const RESOURCE_KEY: &str = "resource";
6666

6767
#[derive(Debug, Clone)]
6868
pub struct Lambda {
69-
tags_map: hash_map::HashMap<String, String>,
69+
tags_map: HashMap<String, String>,
7070
}
7171

7272
fn arch_to_platform<'a>() -> &'a str {
@@ -77,10 +77,10 @@ fn arch_to_platform<'a>() -> &'a str {
7777
}
7878

7979
fn tags_from_env(
80-
mut tags_map: hash_map::HashMap<String, String>,
80+
mut tags_map: HashMap<String, String>,
8181
config: Arc<config::Config>,
82-
metadata: &hash_map::HashMap<String, String>,
83-
) -> hash_map::HashMap<String, String> {
82+
metadata: &HashMap<String, String>,
83+
) -> HashMap<String, String> {
8484
if metadata.contains_key(FUNCTION_ARN_KEY) {
8585
let parts = metadata[FUNCTION_ARN_KEY].split(':').collect::<Vec<&str>>();
8686
if parts.len() > 6 {
@@ -222,10 +222,10 @@ impl Lambda {
222222
#[must_use]
223223
pub fn new_from_config(
224224
config: Arc<config::Config>,
225-
metadata: &hash_map::HashMap<String, String>,
225+
metadata: &HashMap<String, String>,
226226
) -> Self {
227227
Lambda {
228-
tags_map: tags_from_env(hash_map::HashMap::new(), config, metadata),
228+
tags_map: tags_from_env(HashMap::new(), config, metadata),
229229
}
230230
}
231231

@@ -248,19 +248,19 @@ impl Lambda {
248248
}
249249

250250
#[must_use]
251-
pub fn get_tags_map(&self) -> &hash_map::HashMap<String, String> {
251+
pub fn get_tags_map(&self) -> &HashMap<String, String> {
252252
&self.tags_map
253253
}
254254

255255
#[must_use]
256-
pub fn get_function_tags_map(&self) -> hash_map::HashMap<String, String> {
256+
pub fn get_function_tags_map(&self) -> HashMap<String, String> {
257257
let tags = self
258258
.tags_map
259259
.iter()
260260
.map(|(k, v)| format!("{k}:{v}"))
261261
.collect::<Vec<String>>()
262262
.join(",");
263-
hash_map::HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)])
263+
HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)])
264264
}
265265
}
266266

@@ -270,13 +270,14 @@ mod tests {
270270
use super::*;
271271
use crate::config::Config;
272272
use serial_test::serial;
273+
use std::collections::HashMap;
273274
use std::fs::File;
274275
use std::io::Write;
275276
use std::path::Path;
276277

277278
#[test]
278279
fn test_new_from_config() {
279-
let metadata = hash_map::HashMap::new();
280+
let metadata = HashMap::new();
280281
let tags = Lambda::new_from_config(Arc::new(Config::default()), &metadata);
281282
assert_eq!(tags.tags_map.len(), 3);
282283
assert_eq!(
@@ -297,7 +298,7 @@ mod tests {
297298

298299
#[test]
299300
fn test_new_with_function_arn_metadata() {
300-
let mut metadata = hash_map::HashMap::new();
301+
let mut metadata = HashMap::new();
301302
metadata.insert(
302303
FUNCTION_ARN_KEY.to_string(),
303304
"arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(),
@@ -317,7 +318,7 @@ mod tests {
317318
#[test]
318319
#[serial] //run test serially since it sets and unsets env vars
319320
fn test_with_lambda_env_vars() {
320-
let mut metadata = hash_map::HashMap::new();
321+
let mut metadata = HashMap::new();
321322
metadata.insert(
322323
FUNCTION_ARN_KEY.to_string(),
323324
"arn:aws:lambda:us-west-2:123456789012:function:My-function".to_string(),
@@ -387,7 +388,7 @@ mod tests {
387388

388389
#[test]
389390
fn test_get_function_tags_map() {
390-
let mut metadata = hash_map::HashMap::new();
391+
let mut metadata = HashMap::new();
391392
metadata.insert(
392393
FUNCTION_ARN_KEY.to_string(),
393394
"arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(),
@@ -402,7 +403,7 @@ mod tests {
402403
let tags = Lambda::new_from_config(config, &metadata);
403404
let function_tags = tags.get_function_tags_map();
404405
assert_eq!(function_tags.len(), 1);
405-
let fn_tags_map: hash_map::HashMap<String, String> = function_tags
406+
let fn_tags_map: HashMap<String, String> = function_tags
406407
.get(FUNCTION_TAGS_KEY)
407408
.unwrap()
408409
.split(',')

bottlecap/src/traces/stats_flusher.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ use async_trait::async_trait;
55
use std::str::FromStr;
66
use std::sync::Arc;
77
use tokio::sync::Mutex;
8-
use tracing::{debug, error};
98

109
use crate::config;
1110
use crate::traces::stats_aggregator::StatsAggregator;
1211
use datadog_trace_protobuf::pb;
1312
use datadog_trace_utils::{config_utils::trace_stats_url, stats_utils};
1413
use ddcommon::Endpoint;
14+
use tracing::{debug, error};
1515

1616
#[async_trait]
1717
pub trait StatsFlusher {
@@ -49,7 +49,7 @@ impl StatsFlusher for ServerlessStatsFlusher {
4949
let endpoint = Endpoint {
5050
url: hyper::Uri::from_str(&stats_url).expect("can't make URI from stats url, exiting"),
5151
api_key: Some(api_key.clone().into()),
52-
timeout_ms: Endpoint::DEFAULT_TIMEOUT,
52+
timeout_ms: config.flush_timeout * 1_000,
5353
test_token: None,
5454
};
5555

@@ -60,16 +60,6 @@ impl StatsFlusher for ServerlessStatsFlusher {
6060
}
6161
}
6262

63-
async fn flush(&self) {
64-
let mut guard = self.aggregator.lock().await;
65-
66-
let mut stats = guard.get_batch();
67-
while !stats.is_empty() {
68-
self.send(stats).await;
69-
70-
stats = guard.get_batch();
71-
}
72-
}
7363
async fn send(&self, stats: Vec<pb::ClientStatsPayload>) {
7464
if stats.is_empty() {
7565
return;
@@ -88,17 +78,37 @@ impl StatsFlusher for ServerlessStatsFlusher {
8878
}
8979
};
9080

91-
match stats_utils::send_stats_payload(
81+
let stats_url = trace_stats_url(&self.config.site);
82+
83+
let start = std::time::Instant::now();
84+
85+
let resp = stats_utils::send_stats_payload(
9286
serialized_stats_payload,
9387
&self.endpoint,
9488
&self.config.api_key,
9589
)
96-
.await
97-
{
90+
.await;
91+
let elapsed = start.elapsed();
92+
debug!(
93+
"Stats request to {} took {}ms",
94+
stats_url,
95+
elapsed.as_millis()
96+
);
97+
match resp {
9898
Ok(()) => debug!("Successfully flushed stats"),
9999
Err(e) => {
100100
error!("Error sending stats: {e:?}");
101101
}
102+
};
103+
}
104+
async fn flush(&self) {
105+
let mut guard = self.aggregator.lock().await;
106+
107+
let mut stats = guard.get_batch();
108+
while !stats.is_empty() {
109+
self.send(stats).await;
110+
111+
stats = guard.get_batch();
102112
}
103113
}
104114
}

bottlecap/src/traces/trace_flusher.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ impl TraceFlusher for ServerlessTraceFlusher {
5050
if traces.is_empty() {
5151
return;
5252
}
53+
54+
let start = std::time::Instant::now();
5355
debug!("Flushing {} traces", traces.len());
5456

5557
for traces in trace_utils::coalesce_send_data(traces) {
@@ -65,5 +67,6 @@ impl TraceFlusher for ServerlessTraceFlusher {
6567
}
6668
}
6769
}
70+
debug!("Flushing traces took {}ms", start.elapsed().as_millis());
6871
}
6972
}

bottlecap/src/traces/trace_processor.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
11
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use crate::config;
45
use crate::tags::provider;
56
use crate::traces::span_pointers::{attach_span_pointers_to_meta, SpanPointer};
7+
use crate::traces::{
8+
AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX, DNS_LOCAL_HOST_ADDRESS_URL_PREFIX,
9+
DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX,
10+
LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX,
11+
};
12+
use datadog_trace_obfuscation::obfuscate::obfuscate_span;
613
use datadog_trace_obfuscation::obfuscation_config;
714
use datadog_trace_protobuf::pb;
15+
use datadog_trace_protobuf::pb::Span;
816
use datadog_trace_utils::config_utils::trace_intake_url;
17+
use datadog_trace_utils::send_data::{RetryBackoffType, RetryStrategy};
18+
use datadog_trace_utils::trace_utils::SendData;
19+
use datadog_trace_utils::trace_utils::{self};
920
use datadog_trace_utils::tracer_header_tags;
1021
use datadog_trace_utils::tracer_payload::{
1122
TraceChunkProcessor, TraceCollection::V07, TracerPayloadCollection,
@@ -14,16 +25,6 @@ use ddcommon::Endpoint;
1425
use std::str::FromStr;
1526
use std::sync::Arc;
1627

17-
use crate::config;
18-
use crate::traces::{
19-
AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX, DNS_LOCAL_HOST_ADDRESS_URL_PREFIX,
20-
DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX,
21-
LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX,
22-
};
23-
use datadog_trace_obfuscation::obfuscate::obfuscate_span;
24-
use datadog_trace_protobuf::pb::Span;
25-
use datadog_trace_utils::trace_utils::{self, SendData};
26-
2728
#[derive(Clone)]
2829
#[allow(clippy::module_name_repetitions)]
2930
pub struct ServerlessTraceProcessor {
@@ -161,11 +162,18 @@ impl TraceProcessor for ServerlessTraceProcessor {
161162
let endpoint = Endpoint {
162163
url: hyper::Uri::from_str(&intake_url).expect("can't parse trace intake URL, exiting"),
163164
api_key: Some(self.resolved_api_key.clone().into()),
164-
timeout_ms: Endpoint::DEFAULT_TIMEOUT,
165+
timeout_ms: config.flush_timeout * 1_000,
165166
test_token: None,
166167
};
167168

168-
SendData::new(body_size, payload, header_tags, &endpoint)
169+
let mut send_data = SendData::new(body_size, payload, header_tags, &endpoint);
170+
send_data.set_retry_strategy(RetryStrategy::new(
171+
1,
172+
100,
173+
RetryBackoffType::Exponential,
174+
None,
175+
));
176+
send_data
169177
}
170178
}
171179

0 commit comments

Comments
 (0)