Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
287571c
Update to tower 0.5.
Serendo Feb 13, 2025
7829287
Add a RetryPartial variant to RetryAction. which is a closure and imp…
Serendo Feb 13, 2025
7561898
add a changelog fragment for elasticsearch sink retry partial
Serendo Feb 14, 2025
e02e6ad
changed some threshold of the tests. But I dont quite understand how …
Serendo Feb 14, 2025
454d3a2
add newline to changelog end
Serendo Feb 15, 2025
20f2829
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
pront Mar 11, 2025
6edf77f
cargo.lock update
pront Mar 11, 2025
3b241d1
Update changelog.d/elasticsearch_sink_skip_retrying_succeeded_documen…
pront Mar 25, 2025
a6badfc
Merge branch 'vectordotdev:master' into es-retry-tower0.5
Serendo Mar 27, 2025
4a7ab30
remove unwrap for safety.
Serendo Apr 15, 2025
c3f317a
cleanup the code.
Serendo Apr 15, 2025
0034f56
Merge branch 'master' into es-retry-tower0.5
Serendo Apr 16, 2025
19cd25b
Put original events and es request builder into the ElasticsearchReq…
Serendo Apr 16, 2025
6c98e8e
merge master changes
Serendo Jun 25, 2025
0418ace
fix merge conflict
Serendo Jun 25, 2025
09530fa
Merge branch 'master' into es-retry-tower0.5
pront Jun 25, 2025
ae3d727
Update src/sinks/util/retries.rs
Serendo Jul 16, 2025
c5dfe91
Update src/sinks/elasticsearch/retry.rs
Serendo Jul 16, 2025
fb5b597
Update src/sinks/elasticsearch/retry.rs
Serendo Jul 30, 2025
3b1cac7
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
Serendo Jul 31, 2025
522feb6
as pront suggested
Serendo Jul 31, 2025
b6edcf7
Merge branch 'master' into es-retry-tower0.5
Serendo Jul 31, 2025
e972da4
Remove downcast ref and trait usage from partial retry logic
thomasqueirozb Aug 6, 2025
79e7dd3
Fix make check-clippy errors
thomasqueirozb Aug 6, 2025
abeb520
Revert "Fix make check-clippy errors"
thomasqueirozb Aug 6, 2025
3bd9c0d
Revert "Remove downcast ref and trait usage from partial retry logic"
thomasqueirozb Aug 6, 2025
5eb44e1
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
thomasqueirozb Aug 6, 2025
1b64e9e
Revert cmake bump in Cargo.lock
thomasqueirozb Aug 6, 2025
ce35e6d
cargo fmt
thomasqueirozb Aug 6, 2025
2b401f4
Address PR feedback
thomasqueirozb Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `request_retry_partial` behavior for the `elasticsearch` was changed. Now only the failed retriable requests in a bulk will be retried (instead of all requests in the bulk).

authors: Serendo
26 changes: 16 additions & 10 deletions src/sinks/aws_cloudwatch_logs/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,34 @@ use crate::aws::is_retriable_error;
use crate::sinks::{aws_cloudwatch_logs::service::CloudwatchError, util::retries::RetryLogic};

#[derive(Debug)]
pub struct CloudwatchRetryLogic<T> {
phantom: PhantomData<T>,
pub struct CloudwatchRetryLogic<Request, Response> {
request: PhantomData<Request>,
response: PhantomData<Response>,
}
impl<T> CloudwatchRetryLogic<T> {
pub const fn new() -> CloudwatchRetryLogic<T> {
impl<Request, Response> CloudwatchRetryLogic<Request, Response> {
pub const fn new() -> CloudwatchRetryLogic<Request, Response> {
CloudwatchRetryLogic {
phantom: PhantomData,
request: PhantomData,
response: PhantomData,
}
}
}

impl<T> Clone for CloudwatchRetryLogic<T> {
impl<Request, Response> Clone for CloudwatchRetryLogic<Request, Response> {
fn clone(&self) -> Self {
CloudwatchRetryLogic {
phantom: PhantomData,
request: PhantomData,
response: PhantomData,
}
}
}

impl<T: Send + Sync + 'static> RetryLogic for CloudwatchRetryLogic<T> {
impl<Request: Send + Sync + 'static, Response: Send + Sync + 'static> RetryLogic
for CloudwatchRetryLogic<Request, Response>
{
type Error = CloudwatchError;
type Response = T;
type Request = Request;
type Response = Response;

// TODO this match may not be necessary given the logic in `is_retriable_error()`
#[allow(clippy::cognitive_complexity)] // long, but just a hair over our limit
Expand Down Expand Up @@ -84,7 +90,7 @@ mod test {

#[test]
fn test_throttle_retry() {
let retry_logic: CloudwatchRetryLogic<()> = CloudwatchRetryLogic::new();
let retry_logic: CloudwatchRetryLogic<(), ()> = CloudwatchRetryLogic::new();

let meta_err = aws_smithy_types::error::ErrorMetadata::builder()
.code("ThrottlingException")
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Svc = Buffer<
<ConcurrencyLimit<
RateLimit<
Retry<
FibonacciRetryPolicy<CloudwatchRetryLogic<()>>,
FibonacciRetryPolicy<CloudwatchRetryLogic<Vec<InputLogEvent>, ()>>,
Buffer<
Vec<InputLogEvent>,
<Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ struct CloudWatchMetricsRetryLogic;

impl RetryLogic for CloudWatchMetricsRetryLogic {
type Error = SdkError<PutMetricDataError>;
type Request = PartitionInnerBuffer<Vec<Metric>, String>;
type Response = ();

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
R: Send + 'static,
RR: Record + Record<T = R> + Clone + Send + Sync + Unpin + 'static,
E: Send + 'static,
RT: RetryLogic<Response = KinesisResponse> + Default,
RT: RetryLogic<Request = BatchKinesisRequest<RR>, Response = KinesisResponse> + Default,
{
let request_limits = config.request.into_settings();

Expand Down
4 changes: 3 additions & 1 deletion src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
},
};

use super::sink::BatchKinesisRequest;
use super::{
build_sink,
record::{KinesisFirehoseClient, KinesisFirehoseRecord},
Expand Down Expand Up @@ -173,6 +174,7 @@ struct KinesisRetryLogic {

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError, HttpResponse>;
type Request = BatchKinesisRequest<KinesisFirehoseRecord>;
type Response = KinesisResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand All @@ -187,7 +189,7 @@ impl RetryLogic for KinesisRetryLogic {
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
},
};

use super::sink::BatchKinesisRequest;
use super::{
build_sink,
record::{KinesisStreamClient, KinesisStreamRecord},
Expand Down Expand Up @@ -170,6 +171,7 @@ struct KinesisRetryLogic {

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError, HttpResponse>;
type Request = BatchKinesisRequest<KinesisStreamRecord>;
type Response = KinesisResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand All @@ -190,7 +192,7 @@ impl RetryLogic for KinesisRetryLogic {
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_s_s/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::marker::PhantomData;

use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};

use super::service::SendMessageResponse;
use super::{request_builder::SendMessageEntry, service::SendMessageResponse};
use crate::{aws::is_retriable_error, sinks::util::retries::RetryLogic};

#[derive(Debug)]
Expand All @@ -26,6 +26,7 @@ where
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
{
type Error = SdkError<E, HttpResponse>;
type Request = SendMessageEntry;
type Response = SendMessageResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/sinks/azure_common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct AzureBlobRetryLogic;

impl RetryLogic for AzureBlobRetryLogic {
type Error = HttpError;
type Request = AzureBlobRequest;
type Response = AzureBlobResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
5 changes: 3 additions & 2 deletions src/sinks/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ use snafu::ResultExt;

#[derive(Debug, Default, Clone)]
pub struct ClickhouseRetryLogic {
inner: HttpRetryLogic,
inner: HttpRetryLogic<HttpRequest<PartitionKey>>,
}

impl RetryLogic for ClickhouseRetryLogic {
type Error = HttpError;
type Request = HttpRequest<PartitionKey>;
type Response = HttpResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
self.inner.is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
match response.http_response.status() {
StatusCode::INTERNAL_SERVER_ERROR => {
let body = response.http_response.body();
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/clickhouse/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where

/// PartitionKey used to partition events by (database, table) pair.
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub(super) struct PartitionKey {
pub struct PartitionKey {
pub database: String,
pub table: String,
pub format: Format,
Expand Down
1 change: 1 addition & 0 deletions src/sinks/databend/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct DatabendRetryLogic;

impl RetryLogic for DatabendRetryLogic {
type Error = DatabendError;
type Request = DatabendRequest;
type Response = DatabendResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/sinks/datadog/logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct LogApiRetry;

impl RetryLogic for LogApiRetry {
type Error = DatadogApiError;
type Request = LogApiRequest;
type Response = LogApiResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/sinks/datadog/metrics/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct DatadogMetricsRetryLogic;

impl RetryLogic for DatadogMetricsRetryLogic {
type Error = DatadogApiError;
type Request = DatadogMetricsRequest;
type Response = DatadogMetricsResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/datadog/traces/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ pub struct TraceApiRetry;

impl RetryLogic for TraceApiRetry {
type Error = HttpError;
type Request = TraceApiRequest;
type Response = TraceApiResponse;

fn is_retriable_error(&self, _error: &Self::Error) -> bool {
true
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
let status = response.status_code;
match status {
// Use the same status code/retry policy as the Trace agent, additionally retrying
Expand Down
8 changes: 4 additions & 4 deletions src/sinks/elasticsearch/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
},
};

#[derive(Serialize)]
#[derive(Serialize, Clone, Debug)]
pub enum DocumentVersionType {
External,
ExternalGte,
Expand All @@ -34,20 +34,20 @@ impl DocumentVersionType {
}
}

#[derive(Serialize)]
#[derive(Serialize, Clone, Debug)]
pub struct DocumentVersion {
pub kind: DocumentVersionType,
pub value: u64,
}

#[derive(Serialize)]
#[derive(Serialize, Clone, Debug)]
pub enum DocumentMetadata {
WithoutId,
Id(String),
IdAndVersion(String, DocumentVersion),
}

#[derive(Serialize)]
#[derive(Serialize, Clone, Debug)]
pub struct ProcessedEvent {
pub index: String,
pub bulk_action: BulkAction,
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/elasticsearch/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Metadata {
finalizers: EventFinalizers,
batch_size: usize,
events_byte_size: JsonSize,
original_events: Vec<ProcessedEvent>,
}

impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
Expand Down Expand Up @@ -60,6 +61,7 @@ impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
finalizers: events.take_finalizers(),
batch_size: events.len(),
events_byte_size,
original_events: events.clone(),
};
(es_metadata, metadata_builder, events)
}
Expand All @@ -76,6 +78,8 @@ impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
batch_size: es_metadata.batch_size,
events_byte_size: es_metadata.events_byte_size,
metadata,
original_events: es_metadata.original_events,
elasticsearch_request_builder: self.clone(),
}
}
}
Loading
Loading