Skip to content

Commit d9bef7e

Browse files
SerendoprontCopilotthomasqueirozb
authored
feat(elasticsearch sink): fix partial retry logic (#22431)
* Update to tower 0.5. - Adjustments of Buffer type definition in Tower 0.5 - As policy is directly modified in Tower 0.5. Removed the policy part in the RetryPolicyFuture. - The change args to mutable for the retry function and clone_request function. It's changed in Tower 0.5. - Changed the RetryPolicyFuture Output to (). This is also changed in Tower 0.5. - Modified some tests as policy is now modified in advance function; * Add a RetryPartial variant to RetryAction. which is a closure and impls a RetryPartialFunction trait to modify the old request. In the elasticsearch sink. When there are errors in the response, the response's status_codes are keeped in a closure which is later used by the modify_request func in RetryPartialFunction trait. * add a changelog fragment for elasticsearch sink retry partial * changed some threshold of the tests. But I dont quite understand how these stats ranges are related with the test params. * add newline to changelog end * cargo.lock update * Update changelog.d/elasticsearch_sink_skip_retrying_succeeded_documents.feature.md * remove unwrap for safety. * cleanup the code. * Put original events and es request builder into the ElasticsearchRequest Now the request to be retried are constructed from the filtered original events. The metadata should be correct now. * fix merge conflict * Update src/sinks/util/retries.rs change error message to warn message Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update src/sinks/elasticsearch/retry.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update src/sinks/elasticsearch/retry.rs Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> * as pront suggested * Remove downcast ref and trait usage from partial retry logic * Fix make check-clippy errors * Revert "Fix make check-clippy errors" This reverts commit 79e7dd3. * Revert "Remove downcast ref and trait usage from partial retry logic" This reverts commit e972da4. * Revert cmake bump in Cargo.lock * cargo fmt * Address PR feedback --------- Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Thomas <thomas.schneider@datadoghq.com>
1 parent a9c4c16 commit d9bef7e

File tree

6 files changed

+131
-26
lines changed

6 files changed

+131
-26
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `request_retry_partial` behavior for the `elasticsearch` was changed. Now only the failed retriable requests in a bulk will be retried (instead of all requests in the bulk).
2+
3+
authors: Serendo

src/sinks/elasticsearch/encoder.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
},
2020
};
2121

22-
#[derive(Serialize)]
22+
#[derive(Serialize, Clone, Debug)]
2323
pub enum DocumentVersionType {
2424
External,
2525
ExternalGte,
@@ -34,20 +34,20 @@ impl DocumentVersionType {
3434
}
3535
}
3636

37-
#[derive(Serialize)]
37+
#[derive(Serialize, Clone, Debug)]
3838
pub struct DocumentVersion {
3939
pub kind: DocumentVersionType,
4040
pub value: u64,
4141
}
4242

43-
#[derive(Serialize)]
43+
#[derive(Serialize, Clone, Debug)]
4444
pub enum DocumentMetadata {
4545
WithoutId,
4646
Id(String),
4747
IdAndVersion(String, DocumentVersion),
4848
}
4949

50-
#[derive(Serialize)]
50+
#[derive(Serialize, Clone, Debug)]
5151
pub struct ProcessedEvent {
5252
pub index: String,
5353
pub bulk_action: BulkAction,

src/sinks/elasticsearch/request_builder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub struct Metadata {
2626
finalizers: EventFinalizers,
2727
batch_size: usize,
2828
events_byte_size: JsonSize,
29+
original_events: Vec<ProcessedEvent>,
2930
}
3031

3132
impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
@@ -60,6 +61,7 @@ impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
6061
finalizers: events.take_finalizers(),
6162
batch_size: events.len(),
6263
events_byte_size,
64+
original_events: events.clone(),
6365
};
6466
(es_metadata, metadata_builder, events)
6567
}
@@ -76,6 +78,8 @@ impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
7678
batch_size: es_metadata.batch_size,
7779
events_byte_size: es_metadata.events_byte_size,
7880
metadata,
81+
original_events: es_metadata.original_events,
82+
elasticsearch_request_builder: self.clone(),
7983
}
8084
}
8185
}

src/sinks/elasticsearch/retry.rs

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1-
use http::StatusCode;
2-
use serde::Deserialize;
3-
1+
use crate::sinks::{
2+
elasticsearch::encoder::ProcessedEvent,
3+
util::{metadata::RequestMetadataBuilder, request_builder::RequestBuilder},
4+
};
45
use crate::{
6+
event::Finalizable,
57
http::HttpError,
68
sinks::{
7-
elasticsearch::service::ElasticsearchResponse,
8-
util::retries::{RetryAction, RetryLogic},
9+
elasticsearch::service::{ElasticsearchRequest, ElasticsearchResponse},
10+
util::retries::{RetryAction, RetryLogic, RetryPartialFunction},
911
},
1012
};
13+
use http::StatusCode;
14+
use serde::Deserialize;
15+
use vector_lib::json_size::JsonSize;
16+
use vector_lib::EstimatedJsonEncodedSizeOf;
1117

1218
#[derive(Deserialize, Debug)]
1319
struct EsResultResponse {
@@ -86,6 +92,23 @@ pub struct ElasticsearchRetryLogic {
8692
pub retry_partial: bool,
8793
}
8894

95+
// construct a closure by EsRetryClosure { closure: Box::new(|req: ElasticsearchRequest| { new_req }) }
96+
struct EsRetryClosure {
97+
closure: Box<dyn Fn(ElasticsearchRequest) -> ElasticsearchRequest + Send + Sync>,
98+
}
99+
100+
impl RetryPartialFunction for EsRetryClosure {
101+
fn modify_request(&self, request: Box<dyn std::any::Any>) -> Box<dyn std::any::Any> {
102+
match request.downcast::<ElasticsearchRequest>() {
103+
Ok(request) => {
104+
let new_request = (self.closure)(*request);
105+
Box::new(new_request)
106+
}
107+
Err(request) => request,
108+
}
109+
}
110+
}
111+
89112
impl RetryLogic for ElasticsearchRetryLogic {
90113
type Error = HttpError;
91114
type Response = ElasticsearchResponse;
@@ -124,21 +147,57 @@ impl RetryLogic for ElasticsearchRetryLogic {
124147
// We will retry if there exists at least one item that
125148
// failed with a retriable error.
126149
// Those are backpressure and server errors.
127-
if let Some((status, error)) =
150+
let status_codes: Vec<bool> = resp
151+
.iter_status()
152+
.map(|(status, _)| {
153+
status == StatusCode::TOO_MANY_REQUESTS
154+
|| status.is_server_error()
155+
})
156+
.collect();
157+
if let Some((_status, _error)) =
128158
resp.iter_status().find(|(status, _)| {
129159
*status == StatusCode::TOO_MANY_REQUESTS
130160
|| status.is_server_error()
131161
})
132162
{
133-
let msg = if let Some(error) = error {
134-
format!(
135-
"partial error, status: {}, error type: {}, reason: {}",
136-
status, error.err_type, error.reason
137-
)
138-
} else {
139-
format!("partial error, status: {status}")
140-
};
141-
return RetryAction::Retry(msg.into());
163+
return RetryAction::RetryPartial(Box::new(EsRetryClosure {
164+
closure: Box::new(move |req: ElasticsearchRequest| {
165+
let mut failed_events: Vec<ProcessedEvent> = req
166+
.original_events
167+
.clone()
168+
.into_iter()
169+
.zip(status_codes.iter())
170+
.filter(|(_, &flag)| flag)
171+
.map(|(item, _)| item)
172+
.collect();
173+
let finalizers = failed_events.take_finalizers();
174+
let batch_size = failed_events.len();
175+
let events_byte_size = failed_events
176+
.iter()
177+
.map(|x| x.log.estimated_json_encoded_size_of())
178+
.fold(JsonSize::zero(), |a, b| a + b);
179+
let encode_result = match req
180+
.elasticsearch_request_builder
181+
.encode_events(failed_events.clone())
182+
{
183+
Ok(s) => s,
184+
Err(_) => return req,
185+
};
186+
let metadata_builder =
187+
RequestMetadataBuilder::from_events(&failed_events);
188+
let metadata = metadata_builder.build(&encode_result);
189+
ElasticsearchRequest {
190+
payload: encode_result.into_payload(),
191+
finalizers,
192+
batch_size,
193+
events_byte_size,
194+
metadata,
195+
original_events: failed_events,
196+
elasticsearch_request_builder: req
197+
.elasticsearch_request_builder,
198+
}
199+
}),
200+
}));
142201
}
143202
}
144203

@@ -201,7 +260,7 @@ mod tests {
201260
event_status: EventStatus::Errored,
202261
events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
203262
}),
204-
RetryAction::Retry(_)
263+
RetryAction::RetryPartial(_)
205264
));
206265
}
207266

src/sinks/elasticsearch/service.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ use super::{ElasticsearchCommon, ElasticsearchConfig};
1919
use crate::{
2020
event::{EventFinalizers, EventStatus, Finalizable},
2121
http::HttpClient,
22-
sinks::util::{
23-
auth::Auth,
24-
http::{HttpBatchService, RequestConfig},
25-
Compression, ElementCount,
22+
sinks::{
23+
elasticsearch::{encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder},
24+
util::{
25+
auth::Auth,
26+
http::{HttpBatchService, RequestConfig},
27+
Compression, ElementCount,
28+
},
2629
},
2730
};
2831

@@ -33,6 +36,8 @@ pub struct ElasticsearchRequest {
3336
pub batch_size: usize,
3437
pub events_byte_size: JsonSize,
3538
pub metadata: RequestMetadata,
39+
pub original_events: Vec<ProcessedEvent>, //store original_events for reconstruct request when retrying
40+
pub elasticsearch_request_builder: ElasticsearchRequestBuilder,
3641
}
3742

3843
impl ByteSizeOf for ElasticsearchRequest {

src/sinks/util/retries.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@ use crate::Error;
1717
pub enum RetryAction {
1818
/// Indicate that this request should be retried with a reason
1919
Retry(Cow<'static, str>),
20+
/// Indicate that a portion of this request should be retried with a generic function
21+
RetryPartial(Box<dyn RetryPartialFunction>),
2022
/// Indicate that this request should not be retried with a reason
2123
DontRetry(Cow<'static, str>),
2224
/// Indicate that this request should not be retried but the request was successful
2325
Successful,
2426
}
2527

28+
pub trait RetryPartialFunction {
29+
fn modify_request(&self, request: Box<dyn std::any::Any>) -> Box<dyn std::any::Any>;
30+
}
31+
2632
pub trait RetryLogic: Clone + Send + Sync + 'static {
2733
type Error: std::error::Error + Send + Sync + 'static;
2834
type Response;
@@ -141,7 +147,7 @@ where
141147

142148
// NOTE: in the error cases- `Error` and `EventsDropped` internal events are emitted by the
143149
// driver, so only need to log here.
144-
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
150+
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
145151
match result {
146152
Ok(response) => match self.logic.should_retry_response(response) {
147153
RetryAction::Retry(reason) => {
@@ -157,6 +163,34 @@ where
157163
Some(self.build_retry())
158164
}
159165

166+
RetryAction::RetryPartial(rebuild_request_fn) => {
167+
if self.remaining_attempts == 0 {
168+
error!(
169+
message =
170+
"OK/retry response but retries exhausted; dropping the request.",
171+
internal_log_rate_limit = true,
172+
);
173+
return None;
174+
}
175+
let output = rebuild_request_fn.modify_request(Box::new(req.clone()));
176+
if let Ok(output) = output.downcast::<Req>() {
177+
*req = *output;
178+
error!(
179+
message = "OK/retrying partial after response.",
180+
internal_log_rate_limit = true
181+
);
182+
Some(self.build_retry())
183+
} else {
184+
// unlikely to go here.
185+
error!(
186+
message =
187+
"OK/retry response but invalid request; dropping the request.",
188+
internal_log_rate_limit = true,
189+
);
190+
None
191+
}
192+
}
193+
160194
RetryAction::DontRetry(reason) => {
161195
error!(message = "Not retriable; dropping the request.", reason = ?reason, internal_log_rate_limit = true);
162196
None
@@ -220,7 +254,7 @@ impl Future for RetryPolicyFuture {
220254

221255
impl RetryAction {
222256
pub const fn is_retryable(&self) -> bool {
223-
matches!(self, RetryAction::Retry(_))
257+
matches!(self, RetryAction::Retry(_) | RetryAction::RetryPartial(_))
224258
}
225259

226260
pub const fn is_not_retryable(&self) -> bool {

0 commit comments

Comments
 (0)