Skip to content

Commit 09c4e31

Browse files
committed
elasticsearch retries
1 parent f7c7190 commit 09c4e31

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

graph/src/log/elastic.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use serde_json::json;
1616
use slog::*;
1717
use slog_async;
1818

19+
use crate::util::futures::retry;
20+
1921
/// General configuration parameters for Elasticsearch logging.
2022
#[derive(Clone, Debug)]
2123
pub struct ElasticLoggingConfig {
@@ -140,6 +142,8 @@ pub struct ElasticDrainConfig {
140142
pub custom_id_value: String,
141143
/// The batching interval.
142144
pub flush_interval: Duration,
145+
/// Maximum retries in case of error.
146+
pub max_retries: usize,
143147
}
144148

145149
/// An slog `Drain` for logging to Elasticsearch.
@@ -187,6 +191,7 @@ impl ElasticDrain {
187191
let logs = self.logs.clone();
188192
let config = self.config.clone();
189193
let mut interval = tokio::time::interval(self.config.flush_interval);
194+
let max_retries = self.config.max_retries;
190195

191196
crate::task_spawn::spawn(async move {
192197
loop {
@@ -261,7 +266,6 @@ impl ElasticDrain {
261266

262267
// Send batch of logs to Elasticsearch
263268
let client = Client::new();
264-
let logger_for_err = flush_logger.clone();
265269

266270
let header = match config.general.username {
267271
Some(username) => client
@@ -272,19 +276,24 @@ impl ElasticDrain {
272276
.post(batch_url)
273277
.header(CONTENT_TYPE, "application/json"),
274278
};
275-
header
276-
.body(batch_body)
277-
.send()
278-
.and_then(|response| async { response.error_for_status() })
279-
.map_ok(|_| ())
280-
.unwrap_or_else(move |e| {
279+
280+
retry("send logs to elasticsearch", &flush_logger)
281+
.limit(max_retries)
282+
.timeout_secs(30)
283+
.run(move || {
284+
header
285+
.try_clone()
286+
.unwrap() // Unwrap: Request body not yet set
287+
.body(batch_body.clone())
288+
.send()
289+
.and_then(|response| async { response.error_for_status() })
290+
.map_ok(|_| ())
291+
})
292+
.await
293+
.unwrap_or_else(|e| {
281294
// Log if there was a problem sending the logs
282-
error!(
283-
logger_for_err,
284-
"Failed to send logs to Elasticsearch: {}", e
285-
);
295+
error!(flush_logger, "Failed to send logs to Elasticsearch: {}", e);
286296
})
287-
.await;
288297
}
289298
});
290299
}

graph/src/log/factory.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ lazy_static::lazy_static! {
1111
.unwrap_or("5".into())
1212
.parse::<u64>()
1313
.expect("invalid GRAPH_ELASTIC_SEARCH_FLUSH_INTERVAL_SECS"));
14+
15+
static ref ES_MAX_RETRIES: usize =
16+
std::env::var("GRAPH_ELASTIC_SEARCH_MAX_RETRIES")
17+
.unwrap_or("5".into())
18+
.parse::<usize>()
19+
.expect("invalid GRAPH_ELASTIC_SEARCH_MAX_RETRIES");
1420
}
1521

1622
/// Configuration for component-specific logging to Elasticsearch.
@@ -73,6 +79,7 @@ impl LoggerFactory {
7379
custom_id_key: String::from("componentId"),
7480
custom_id_value: component.to_string(),
7581
flush_interval: *ES_FLUSH_INTERVAL,
82+
max_retries: *ES_MAX_RETRIES,
7683
},
7784
term_logger.clone(),
7885
),
@@ -102,6 +109,7 @@ impl LoggerFactory {
102109
custom_id_key: String::from("subgraphId"),
103110
custom_id_value: loc.hash.to_string(),
104111
flush_interval: *ES_FLUSH_INTERVAL,
112+
max_retries: *ES_MAX_RETRIES,
105113
},
106114
term_logger.clone(),
107115
),

0 commit comments

Comments
 (0)