Skip to content

Commit 376b4dd

Browse files
authored
chore: refactor logs flusher (#562)
* fix clippy holding lock warning * refactor logs flusher * use futures instead of task joinSet
1 parent 3fbeca4 commit 376b4dd

File tree

4 files changed

+102
-93
lines changed

4 files changed

+102
-93
lines changed

bottlecap/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ rustls = { version = "0.23.18", default-features = false, features = ["aws-lc-rs
4141
rand = { version = "0.8", default-features = false }
4242
prost = { version = "0.11.6", default-features = false }
4343
zstd = { version = "0.13.2", default-features = false }
44+
futures = { version = "0.3.31", default-features = false}
4445

4546
[dev-dependencies]
4647
figment = { version = "0.10", default-features = false, features = ["yaml", "env", "test"] }

bottlecap/src/logs/aggregator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use tracing::warn;
33

44
use crate::logs::constants;
55

6+
#[derive(Debug, Clone)]
67
pub struct Aggregator {
78
messages: VecDeque<String>,
89
max_batch_entries_size: usize,

bottlecap/src/logs/flusher.rs

Lines changed: 99 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
use crate::config;
22
use crate::http_client;
33
use crate::logs::aggregator::Aggregator;
4+
use futures::future::join_all;
5+
use reqwest::header::HeaderMap;
46
use std::time::Instant;
57
use std::{
68
error::Error,
79
io::Write,
810
sync::{Arc, Mutex},
911
};
10-
use tokio::task::JoinSet;
1112
use tracing::{debug, error};
1213
use zstd::stream::write::Encoder;
1314

15+
#[derive(Debug, Clone)]
1416
pub struct Flusher {
15-
api_key: String,
1617
fqdn_site: String,
1718
client: reqwest::Client,
1819
aggregator: Arc<Mutex<Aggregator>>,
1920
config: Arc<config::Config>,
21+
headers: HeaderMap,
2022
}
2123

2224
#[inline]
@@ -25,7 +27,6 @@ pub fn build_fqdn_logs(site: String) -> String {
2527
format!("https://http-intake.logs.{site}")
2628
}
2729

28-
#[allow(clippy::await_holding_lock)]
2930
impl Flusher {
3031
pub fn new(
3132
api_key: String,
@@ -34,113 +35,118 @@ impl Flusher {
3435
config: Arc<config::Config>,
3536
) -> Self {
3637
let client = http_client::get_client(config.clone());
38+
let mut headers = HeaderMap::new();
39+
headers.insert(
40+
"DD-API-KEY",
41+
api_key.clone().parse().expect("failed to parse header"),
42+
);
43+
headers.insert(
44+
"DD-PROTOCOL",
45+
"agent-json".parse().expect("failed to parse header"),
46+
);
47+
headers.insert(
48+
"Content-Type",
49+
"application/json".parse().expect("failed to parse header"),
50+
);
51+
52+
if config.logs_config_use_compression {
53+
headers.insert(
54+
"Content-Encoding",
55+
"zstd".parse().expect("failed to parse header"),
56+
);
57+
}
58+
3759
Flusher {
38-
api_key,
3960
fqdn_site: site,
4061
client,
4162
aggregator,
4263
config,
64+
headers,
4365
}
4466
}
4567
pub async fn flush(&self) {
46-
let mut guard = self.aggregator.lock().expect("lock poisoned");
47-
let mut set = JoinSet::new();
48-
49-
let mut logs = guard.get_batch();
50-
while !logs.is_empty() {
51-
let api_key = self.api_key.clone();
52-
let site = self.fqdn_site.clone();
53-
let cloned_client = self.client.clone();
54-
let cloned_use_compression = self.config.logs_config_use_compression;
55-
let cloned_compression_level = self.config.logs_config_compression_level;
56-
set.spawn(async move {
57-
Self::send(
58-
cloned_client,
59-
api_key,
60-
site,
61-
logs,
62-
cloned_use_compression,
63-
cloned_compression_level,
64-
)
65-
.await;
66-
});
67-
logs = guard.get_batch();
68-
}
69-
drop(guard);
70-
while let Some(res) = set.join_next().await {
71-
match res {
72-
Ok(()) => (),
73-
Err(e) => {
74-
error!("Failed to wait for request sending {}", e);
75-
}
68+
let logs_batches = {
69+
let mut guard = self.aggregator.lock().expect("lock poisoned");
70+
let mut batches = Vec::new();
71+
let mut current_batch = guard.get_batch();
72+
73+
while !current_batch.is_empty() {
74+
batches.push(current_batch);
75+
current_batch = guard.get_batch();
76+
}
77+
78+
batches
79+
};
80+
81+
let futures = logs_batches.into_iter().filter(|b| !b.is_empty()).map(|b| {
82+
let req = self.create_request(b);
83+
Self::send(req)
84+
});
85+
86+
let results = join_all(futures).await;
87+
88+
for result in results {
89+
if let Err(e) = result {
90+
debug!("Failed to send logs: {}", e);
7691
}
7792
}
7893
}
7994

80-
#[allow(clippy::unwrap_used)]
81-
async fn send(
82-
client: reqwest::Client,
83-
api_key: String,
84-
fqdn: String,
85-
data: Vec<u8>,
86-
compression_enabled: bool,
87-
compression_level: i32,
88-
) {
89-
if !data.is_empty() {
90-
let url = format!("{fqdn}/api/v2/logs");
91-
let start = Instant::now();
92-
let body = if compression_enabled {
93-
let result = (|| -> Result<Vec<u8>, Box<dyn Error>> {
94-
let mut encoder = Encoder::new(Vec::new(), compression_level)
95-
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
96-
encoder
97-
.write_all(&data)
98-
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
99-
100-
encoder.finish().map_err(|e| Box::new(e) as Box<dyn Error>)
101-
})();
102-
103-
if let Ok(compressed_data) = result {
104-
compressed_data
105-
} else {
106-
debug!("Failed to compress data, sending uncompressed data");
107-
data
108-
}
109-
} else {
110-
data
111-
};
112-
let req = client
113-
.post(&url)
114-
.header("DD-API-KEY", api_key)
115-
.header("DD-PROTOCOL", "agent-json")
116-
.header("Content-Type", "application/json");
117-
let req = if compression_enabled {
118-
req.header("Content-Encoding", "zstd")
119-
} else {
120-
req
121-
};
122-
let resp: Result<reqwest::Response, reqwest::Error> = req.body(body).send().await;
123-
124-
let elapsed = start.elapsed();
125-
126-
match resp {
127-
Ok(resp) => {
128-
if resp.status() != 202 {
129-
debug!(
130-
"Failed to send logs to datadog after {}ms: {}",
131-
elapsed.as_millis(),
132-
resp.status()
133-
);
134-
}
135-
}
136-
Err(e) => {
137-
error!(
95+
fn create_request(&self, data: Vec<u8>) -> reqwest::RequestBuilder {
96+
let url = format!("{}/api/v2/logs", self.fqdn_site);
97+
let body = self.compress(data);
98+
self.client
99+
.post(&url)
100+
.headers(self.headers.clone())
101+
.body(body)
102+
}
103+
104+
async fn send(req: reqwest::RequestBuilder) -> Result<(), Box<dyn Error>> {
105+
let time = Instant::now();
106+
let resp = req.send().await;
107+
let elapsed = time.elapsed();
108+
109+
match resp {
110+
Ok(resp) => {
111+
if resp.status() != 202 {
112+
debug!(
138113
"Failed to send logs to datadog after {}ms: {}",
139114
elapsed.as_millis(),
140-
e
115+
resp.status()
141116
);
142117
}
143118
}
119+
Err(e) => {
120+
error!(
121+
"Failed to send logs to datadog after {}ms: {}",
122+
elapsed.as_millis(),
123+
e
124+
);
125+
126+
return Err(Box::new(e));
127+
}
144128
}
129+
130+
Ok(())
131+
}
132+
133+
fn compress(&self, data: Vec<u8>) -> Vec<u8> {
134+
if !self.config.logs_config_use_compression {
135+
return data;
136+
}
137+
138+
match self.encode(&data) {
139+
Ok(compressed_data) => compressed_data,
140+
Err(e) => {
141+
debug!("Failed to compress data: {}", e);
142+
data
143+
}
144+
}
145+
}
146+
147+
fn encode(&self, data: &[u8]) -> Result<Vec<u8>, Box<dyn Error>> {
148+
let mut encoder = Encoder::new(Vec::new(), self.config.logs_config_compression_level)?;
149+
encoder.write_all(data)?;
150+
encoder.finish().map_err(|e| Box::new(e) as Box<dyn Error>)
145151
}
146152
}

0 commit comments

Comments
 (0)