Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
287571c
Update to tower 0.5.
Serendo Feb 13, 2025
7829287
Add a RetryPartial variant to RetryAction. which is a closure and imp…
Serendo Feb 13, 2025
7561898
add a changelog fragment for elasticsearch sink retry partial
Serendo Feb 14, 2025
e02e6ad
changed some threshold of the tests. But I dont quite understand how …
Serendo Feb 14, 2025
454d3a2
add newline to changelog end
Serendo Feb 15, 2025
20f2829
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
pront Mar 11, 2025
6edf77f
cargo.lock update
pront Mar 11, 2025
3b241d1
Update changelog.d/elasticsearch_sink_skip_retrying_succeeded_documen…
pront Mar 25, 2025
a6badfc
Merge branch 'vectordotdev:master' into es-retry-tower0.5
Serendo Mar 27, 2025
4a7ab30
remove unwrap for safety.
Serendo Apr 15, 2025
c3f317a
cleanup the code.
Serendo Apr 15, 2025
0034f56
Merge branch 'master' into es-retry-tower0.5
Serendo Apr 16, 2025
19cd25b
Put original events and es request builder into the ElasticsearchReq…
Serendo Apr 16, 2025
6c98e8e
merge master changes
Serendo Jun 25, 2025
0418ace
fix merge conflict
Serendo Jun 25, 2025
09530fa
Merge branch 'master' into es-retry-tower0.5
pront Jun 25, 2025
ae3d727
Update src/sinks/util/retries.rs
Serendo Jul 16, 2025
c5dfe91
Update src/sinks/elasticsearch/retry.rs
Serendo Jul 16, 2025
fb5b597
Update src/sinks/elasticsearch/retry.rs
Serendo Jul 30, 2025
3b1cac7
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
Serendo Jul 31, 2025
522feb6
as pront suggested
Serendo Jul 31, 2025
b6edcf7
Merge branch 'master' into es-retry-tower0.5
Serendo Jul 31, 2025
e972da4
Remove downcast ref and trait usage from partial retry logic
thomasqueirozb Aug 6, 2025
79e7dd3
Fix make check-clippy errors
thomasqueirozb Aug 6, 2025
abeb520
Revert "Fix make check-clippy errors"
thomasqueirozb Aug 6, 2025
3bd9c0d
Revert "Remove downcast ref and trait usage from partial retry logic"
thomasqueirozb Aug 6, 2025
5eb44e1
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
thomasqueirozb Aug 6, 2025
1b64e9e
Revert cmake bump in Cargo.lock
thomasqueirozb Aug 6, 2025
ce35e6d
cargo fmt
thomasqueirozb Aug 6, 2025
2b401f4
Address PR feedback
thomasqueirozb Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ azure_storage_blobs = { version = "0.17", default-features = false, optional = t
opendal = { version = "0.45", default-features = false, features = ["native-tls", "services-webhdfs"], optional = true }

# Tower
tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
tower = { version = "0.5.2", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
tower-http = { version = "0.4.4", default-features = false, features = ["compression-full", "decompression-gzip", "trace"] }
# Serde
serde.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
request_retry_partial option of elasticsearch sink's behaviour changed. Now only retriable failed requests in a bulk will be retried (instead of all requests in the bulk).

authors: Serendo
11 changes: 7 additions & 4 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ use crate::sinks::{
};

type Svc = Buffer<
ConcurrencyLimit<
Vec<InputLogEvent>,
<ConcurrencyLimit<
RateLimit<
Retry<
FibonacciRetryPolicy<CloudwatchRetryLogic<()>>,
Buffer<Timeout<CloudwatchLogsSvc>, Vec<InputLogEvent>>,
Buffer<
Vec<InputLogEvent>,
<Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,
>,
>,
>,
>,
Vec<InputLogEvent>,
> as Service<Vec<InputLogEvent>>>::Future,
>;

#[derive(Debug)]
Expand Down
73 changes: 60 additions & 13 deletions src/sinks/elasticsearch/retry.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use bytes::Bytes;
use http::StatusCode;
use serde::Deserialize;

use crate::{
http::HttpError,
sinks::{
elasticsearch::service::ElasticsearchResponse,
util::retries::{RetryAction, RetryLogic},
elasticsearch::service::{ElasticsearchRequest, ElasticsearchResponse},
util::retries::{RetryAction, RetryLogic, RetryPartialFunction},
},
};

Expand Down Expand Up @@ -89,6 +90,19 @@ pub struct ElasticsearchRetryLogic {
pub retry_partial: bool,
}

// construct a closure by EsRetryClosure { closure: Box::new(|req: ElasticsearchRequest| { new_req }) }
struct EsRetryClosure {
closure: Box<dyn Fn(ElasticsearchRequest) -> ElasticsearchRequest + Send + Sync>,
}

impl RetryPartialFunction for EsRetryClosure {
fn modify_request(&self, request: Box<dyn std::any::Any>) -> Box<dyn std::any::Any> {
let request = request.downcast::<ElasticsearchRequest>().unwrap();
let new_request = (self.closure)(*request);
Box::new(new_request)
}
}

impl RetryLogic for ElasticsearchRetryLogic {
type Error = HttpError;
type Response = ElasticsearchResponse;
Expand Down Expand Up @@ -127,21 +141,54 @@ impl RetryLogic for ElasticsearchRetryLogic {
// We will retry if there exists at least one item that
// failed with a retriable error.
// Those are backpressure and server errors.
if let Some((status, error)) =
let status_codes: Vec<bool> = resp
.iter_status()
.map(|(status, _)| {
status == StatusCode::TOO_MANY_REQUESTS
|| status.is_server_error()
})
.collect::<Vec<_>>();
if let Some((_status, _error)) =
resp.iter_status().find(|(status, _)| {
*status == StatusCode::TOO_MANY_REQUESTS
|| status.is_server_error()
})
{
let msg = if let Some(error) = error {
format!(
"partial error, status: {}, error type: {}, reason: {}",
status, error.err_type, error.reason
)
} else {
format!("partial error, status: {}", status)
};
return RetryAction::Retry(msg.into());
return RetryAction::RetryPartial(Box::new(EsRetryClosure {
closure: Box::new(move |req: ElasticsearchRequest| {
let byte_slice: &[u8] = req.payload.as_ref();
let string_data: &str =
std::str::from_utf8(byte_slice).unwrap();
let lines: Vec<&str> = string_data.lines().collect();
let mut grouped_lines = Vec::new();
for chunk in lines.chunks(2) {
let group = chunk.join("\n");
grouped_lines.push(group);
}
if grouped_lines.len() != status_codes.len() {
req
} else {
let payload = grouped_lines
.into_iter()
.zip(<std::vec::Vec<bool> as Clone>::clone(
&status_codes,
))
.filter(|&(_, flag)| flag)
.map(|(item, _)| item)
.collect::<Vec<_>>();
let mut req = req.clone();
// change batch_size
req.batch_size = payload.len();
// change payload
req.payload = Bytes::from(
(payload.join("\n") + "\n").into_bytes(),
);
// println!("NEW REQ <DEBUG> {:?}", req);
// TODO: need to fix some metadata here
req
}
}),
}));
}
}

Expand Down Expand Up @@ -204,7 +251,7 @@ mod tests {
event_status: EventStatus::Errored,
events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
}),
RetryAction::Retry(_)
RetryAction::RetryPartial(_)
));
}

Expand Down
Loading
Loading