Skip to content

Commit 0b3c886

Browse files
chore(sinks): remove downcast and trait in sink retry code (#23543)
* Reapply "Remove downcast ref and trait usage from partial retry logic" This reverts commit 3bd9c0d. * Reapply "Fix make check-clippy errors" This reverts commit abeb520. * Use Fn instead of FnOnce
1 parent a1c1fc3 commit 0b3c886

File tree

36 files changed

+183
-136
lines changed

36 files changed

+183
-136
lines changed

src/sinks/aws_cloudwatch_logs/retry.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,34 @@ use crate::aws::is_retriable_error;
99
use crate::sinks::{aws_cloudwatch_logs::service::CloudwatchError, util::retries::RetryLogic};
1010

1111
#[derive(Debug)]
12-
pub struct CloudwatchRetryLogic<T> {
13-
phantom: PhantomData<T>,
12+
pub struct CloudwatchRetryLogic<Request, Response> {
13+
request: PhantomData<Request>,
14+
response: PhantomData<Response>,
1415
}
15-
impl<T> CloudwatchRetryLogic<T> {
16-
pub const fn new() -> CloudwatchRetryLogic<T> {
16+
impl<Request, Response> CloudwatchRetryLogic<Request, Response> {
17+
pub const fn new() -> CloudwatchRetryLogic<Request, Response> {
1718
CloudwatchRetryLogic {
18-
phantom: PhantomData,
19+
request: PhantomData,
20+
response: PhantomData,
1921
}
2022
}
2123
}
2224

23-
impl<T> Clone for CloudwatchRetryLogic<T> {
25+
impl<Request, Response> Clone for CloudwatchRetryLogic<Request, Response> {
2426
fn clone(&self) -> Self {
2527
CloudwatchRetryLogic {
26-
phantom: PhantomData,
28+
request: PhantomData,
29+
response: PhantomData,
2730
}
2831
}
2932
}
3033

31-
impl<T: Send + Sync + 'static> RetryLogic for CloudwatchRetryLogic<T> {
34+
impl<Request: Send + Sync + 'static, Response: Send + Sync + 'static> RetryLogic
35+
for CloudwatchRetryLogic<Request, Response>
36+
{
3237
type Error = CloudwatchError;
33-
type Response = T;
38+
type Request = Request;
39+
type Response = Response;
3440

3541
// TODO this match may not be necessary given the logic in `is_retriable_error()`
3642
#[allow(clippy::cognitive_complexity)] // long, but just a hair over our limit
@@ -84,7 +90,7 @@ mod test {
8490

8591
#[test]
8692
fn test_throttle_retry() {
87-
let retry_logic: CloudwatchRetryLogic<()> = CloudwatchRetryLogic::new();
93+
let retry_logic: CloudwatchRetryLogic<(), ()> = CloudwatchRetryLogic::new();
8894

8995
let meta_err = aws_smithy_types::error::ErrorMetadata::builder()
9096
.code("ThrottlingException")

src/sinks/aws_cloudwatch_logs/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Svc = Buffer<
5050
<ConcurrencyLimit<
5151
RateLimit<
5252
Retry<
53-
FibonacciRetryPolicy<CloudwatchRetryLogic<()>>,
53+
FibonacciRetryPolicy<CloudwatchRetryLogic<Vec<InputLogEvent>, ()>>,
5454
Buffer<
5555
Vec<InputLogEvent>,
5656
<Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,

src/sinks/aws_cloudwatch_metrics/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ struct CloudWatchMetricsRetryLogic;
201201

202202
impl RetryLogic for CloudWatchMetricsRetryLogic {
203203
type Error = SdkError<PutMetricDataError>;
204+
type Request = PartitionInnerBuffer<Vec<Metric>, String>;
204205
type Response = ();
205206

206207
fn is_retriable_error(&self, error: &Self::Error) -> bool {

src/sinks/aws_kinesis/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ where
9999
R: Send + 'static,
100100
RR: Record + Record<T = R> + Clone + Send + Sync + Unpin + 'static,
101101
E: Send + 'static,
102-
RT: RetryLogic<Response = KinesisResponse> + Default,
102+
RT: RetryLogic<Request = BatchKinesisRequest<RR>, Response = KinesisResponse> + Default,
103103
{
104104
let request_limits = config.request.into_settings();
105105

src/sinks/aws_kinesis/firehose/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::{
1616
},
1717
};
1818

19+
use super::sink::BatchKinesisRequest;
1920
use super::{
2021
build_sink,
2122
record::{KinesisFirehoseClient, KinesisFirehoseRecord},
@@ -173,6 +174,7 @@ struct KinesisRetryLogic {
173174

174175
impl RetryLogic for KinesisRetryLogic {
175176
type Error = SdkError<KinesisError, HttpResponse>;
177+
type Request = BatchKinesisRequest<KinesisFirehoseRecord>;
176178
type Response = KinesisResponse;
177179

178180
fn is_retriable_error(&self, error: &Self::Error) -> bool {
@@ -187,7 +189,7 @@ impl RetryLogic for KinesisRetryLogic {
187189
is_retriable_error(error)
188190
}
189191

190-
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
192+
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
191193
if response.failure_count > 0 && self.retry_partial {
192194
let msg = format!("partial error count {}", response.failure_count);
193195
RetryAction::Retry(msg.into())

src/sinks/aws_kinesis/streams/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::{
1515
},
1616
};
1717

18+
use super::sink::BatchKinesisRequest;
1819
use super::{
1920
build_sink,
2021
record::{KinesisStreamClient, KinesisStreamRecord},
@@ -170,6 +171,7 @@ struct KinesisRetryLogic {
170171

171172
impl RetryLogic for KinesisRetryLogic {
172173
type Error = SdkError<KinesisError, HttpResponse>;
174+
type Request = BatchKinesisRequest<KinesisStreamRecord>;
173175
type Response = KinesisResponse;
174176

175177
fn is_retriable_error(&self, error: &Self::Error) -> bool {
@@ -190,7 +192,7 @@ impl RetryLogic for KinesisRetryLogic {
190192
is_retriable_error(error)
191193
}
192194

193-
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
195+
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
194196
if response.failure_count > 0 && self.retry_partial {
195197
let msg = format!("partial error count {}", response.failure_count);
196198
RetryAction::Retry(msg.into())

src/sinks/aws_s_s/retry.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::marker::PhantomData;
22

33
use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
44

5-
use super::service::SendMessageResponse;
5+
use super::{request_builder::SendMessageEntry, service::SendMessageResponse};
66
use crate::{aws::is_retriable_error, sinks::util::retries::RetryLogic};
77

88
#[derive(Debug)]
@@ -26,6 +26,7 @@ where
2626
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
2727
{
2828
type Error = SdkError<E, HttpResponse>;
29+
type Request = SendMessageEntry;
2930
type Response = SendMessageResponse;
3031

3132
fn is_retriable_error(&self, error: &Self::Error) -> bool {

src/sinks/azure_common/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub struct AzureBlobRetryLogic;
5757

5858
impl RetryLogic for AzureBlobRetryLogic {
5959
type Error = HttpError;
60+
type Request = AzureBlobRequest;
6061
type Response = AzureBlobResponse;
6162

6263
fn is_retriable_error(&self, error: &Self::Error) -> bool {

src/sinks/clickhouse/service.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,19 @@ use snafu::ResultExt;
2323

2424
#[derive(Debug, Default, Clone)]
2525
pub struct ClickhouseRetryLogic {
26-
inner: HttpRetryLogic,
26+
inner: HttpRetryLogic<HttpRequest<PartitionKey>>,
2727
}
2828

2929
impl RetryLogic for ClickhouseRetryLogic {
3030
type Error = HttpError;
31+
type Request = HttpRequest<PartitionKey>;
3132
type Response = HttpResponse;
3233

3334
fn is_retriable_error(&self, error: &Self::Error) -> bool {
3435
self.inner.is_retriable_error(error)
3536
}
3637

37-
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
38+
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
3839
match response.http_response.status() {
3940
StatusCode::INTERNAL_SERVER_ERROR => {
4041
let body = response.http_response.body();

src/sinks/clickhouse/sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ where
8383

8484
/// PartitionKey used to partition events by (database, table) pair.
8585
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
86-
pub(super) struct PartitionKey {
86+
pub struct PartitionKey {
8787
pub database: String,
8888
pub table: String,
8989
pub format: Format,

0 commit comments

Comments
 (0)