Skip to content

Commit bae47d3

Browse files
authored
feat(retry): Add bounded retries across etl (#369)
1 parent bf55114 commit bae47d3

31 files changed

+352
-53
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8787
pg_connection: pg,
8888
batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 },
8989
table_error_retry_delay_ms: 10_000,
90+
table_error_retry_max_attempts: 5,
9091
max_table_sync_workers: 4,
9192
};
9293

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7272
pg_connection: pg_config,
7373
batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 },
7474
table_error_retry_delay_ms: 10000,
75+
table_error_retry_max_attempts: 5,
7576
max_table_sync_workers: 4,
7677
};
7778

docs/tutorials/custom-implementations.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
621621

622622
// Error handling configuration
623623
table_error_retry_delay_ms: 10000, // Wait 10s before retrying failed tables
624+
table_error_retry_max_attempts: 5, // Stop automatic retries after 5 attempts
624625
max_table_sync_workers: 2, // Use 2 workers for parallel table syncing
625626
};
626627

docs/tutorials/first-pipeline.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
116116
max_fill_ms: 5000,
117117
},
118118
table_error_retry_delay_ms: 10000,
119+
table_error_retry_max_attempts: 5,
119120
max_table_sync_workers: 4,
120121
};
121122

etl-api/src/configs/pipeline.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@ use crate::configs::store::Store;
77
const DEFAULT_BATCH_MAX_SIZE: usize = 100000;
88
const DEFAULT_BATCH_MAX_FILL_MS: u64 = 10000;
99
const DEFAULT_TABLE_ERROR_RETRY_DELAY_MS: u64 = 10000;
10+
const DEFAULT_TABLE_ERROR_RETRY_MAX_ATTEMPTS: u32 = 5;
1011
const DEFAULT_MAX_TABLE_SYNC_WORKERS: u16 = 4;
1112

13+
const fn default_table_error_retry_max_attempts() -> u32 {
14+
DEFAULT_TABLE_ERROR_RETRY_MAX_ATTEMPTS
15+
}
16+
1217
/// Batch processing configuration for pipelines.
1318
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
1419
#[serde(rename_all = "snake_case")]
@@ -30,6 +35,9 @@ pub struct FullApiPipelineConfig {
3035
#[schema(example = 1000)]
3136
#[serde(skip_serializing_if = "Option::is_none")]
3237
pub table_error_retry_delay_ms: Option<u64>,
38+
#[schema(example = 5)]
39+
#[serde(skip_serializing_if = "Option::is_none")]
40+
pub table_error_retry_max_attempts: Option<u32>,
3341
#[schema(example = 4)]
3442
#[serde(skip_serializing_if = "Option::is_none")]
3543
pub max_table_sync_workers: Option<u16>,
@@ -44,6 +52,7 @@ impl From<StoredPipelineConfig> for FullApiPipelineConfig {
4452
max_fill_ms: Some(value.batch.max_fill_ms),
4553
}),
4654
table_error_retry_delay_ms: Some(value.table_error_retry_delay_ms),
55+
table_error_retry_max_attempts: Some(value.table_error_retry_max_attempts),
4756
max_table_sync_workers: Some(value.max_table_sync_workers),
4857
}
4958
}
@@ -60,6 +69,9 @@ pub struct PartialApiPipelineConfig {
6069
#[schema(example = 1000)]
6170
#[serde(skip_serializing_if = "Option::is_none")]
6271
pub table_error_retry_delay_ms: Option<u64>,
72+
#[schema(example = 5)]
73+
#[serde(skip_serializing_if = "Option::is_none")]
74+
pub table_error_retry_max_attempts: Option<u32>,
6375
#[schema(example = 4)]
6476
#[serde(skip_serializing_if = "Option::is_none")]
6577
pub max_table_sync_workers: Option<u16>,
@@ -70,6 +82,8 @@ pub struct StoredPipelineConfig {
7082
pub publication_name: String,
7183
pub batch: BatchConfig,
7284
pub table_error_retry_delay_ms: u64,
85+
#[serde(default = "default_table_error_retry_max_attempts")]
86+
pub table_error_retry_max_attempts: u32,
7387
pub max_table_sync_workers: u16,
7488
}
7589

@@ -85,6 +99,7 @@ impl StoredPipelineConfig {
8599
pg_connection: pg_connection_config,
86100
batch: self.batch,
87101
table_error_retry_delay_ms: self.table_error_retry_delay_ms,
102+
table_error_retry_max_attempts: self.table_error_retry_max_attempts,
88103
max_table_sync_workers: self.max_table_sync_workers,
89104
}
90105
}
@@ -107,6 +122,10 @@ impl StoredPipelineConfig {
107122
self.table_error_retry_delay_ms = value;
108123
}
109124

125+
if let Some(value) = partial.table_error_retry_max_attempts {
126+
self.table_error_retry_max_attempts = value;
127+
}
128+
110129
if let Some(value) = partial.max_table_sync_workers {
111130
self.max_table_sync_workers = value;
112131
}
@@ -134,6 +153,9 @@ impl From<FullApiPipelineConfig> for StoredPipelineConfig {
134153
table_error_retry_delay_ms: value
135154
.table_error_retry_delay_ms
136155
.unwrap_or(DEFAULT_TABLE_ERROR_RETRY_DELAY_MS),
156+
table_error_retry_max_attempts: value
157+
.table_error_retry_max_attempts
158+
.unwrap_or(DEFAULT_TABLE_ERROR_RETRY_MAX_ATTEMPTS),
137159
max_table_sync_workers: value
138160
.max_table_sync_workers
139161
.unwrap_or(DEFAULT_MAX_TABLE_SYNC_WORKERS),
@@ -155,6 +177,7 @@ mod tests {
155177
max_fill_ms: 5000,
156178
},
157179
table_error_retry_delay_ms: 2000,
180+
table_error_retry_max_attempts: 7,
158181
max_table_sync_workers: 4,
159182
};
160183

@@ -167,6 +190,10 @@ mod tests {
167190
config.table_error_retry_delay_ms,
168191
deserialized.table_error_retry_delay_ms
169192
);
193+
assert_eq!(
194+
config.table_error_retry_max_attempts,
195+
deserialized.table_error_retry_max_attempts
196+
);
170197
assert_eq!(
171198
config.max_table_sync_workers,
172199
deserialized.max_table_sync_workers
@@ -179,6 +206,7 @@ mod tests {
179206
publication_name: "test_publication".to_string(),
180207
batch: None,
181208
table_error_retry_delay_ms: None,
209+
table_error_retry_max_attempts: None,
182210
max_table_sync_workers: None,
183211
};
184212

@@ -194,6 +222,7 @@ mod tests {
194222
publication_name: "test_publication".to_string(),
195223
batch: None,
196224
table_error_retry_delay_ms: None,
225+
table_error_retry_max_attempts: None,
197226
max_table_sync_workers: None,
198227
};
199228

@@ -205,6 +234,10 @@ mod tests {
205234
stored.table_error_retry_delay_ms,
206235
DEFAULT_TABLE_ERROR_RETRY_DELAY_MS
207236
);
237+
assert_eq!(
238+
stored.table_error_retry_max_attempts,
239+
DEFAULT_TABLE_ERROR_RETRY_MAX_ATTEMPTS
240+
);
208241
assert_eq!(
209242
stored.max_table_sync_workers,
210243
DEFAULT_MAX_TABLE_SYNC_WORKERS
@@ -220,6 +253,7 @@ mod tests {
220253
max_fill_ms: 2000,
221254
},
222255
table_error_retry_delay_ms: 1000,
256+
table_error_retry_max_attempts: 3,
223257
max_table_sync_workers: 2,
224258
};
225259

@@ -230,6 +264,7 @@ mod tests {
230264
max_fill_ms: Some(8000),
231265
}),
232266
table_error_retry_delay_ms: Some(5000),
267+
table_error_retry_max_attempts: Some(9),
233268
max_table_sync_workers: None,
234269
};
235270

@@ -239,6 +274,7 @@ mod tests {
239274
assert_eq!(stored.batch.max_size, 1000);
240275
assert_eq!(stored.batch.max_fill_ms, 8000);
241276
assert_eq!(stored.table_error_retry_delay_ms, 5000);
277+
assert_eq!(stored.table_error_retry_max_attempts, 9);
242278
assert_eq!(stored.max_table_sync_workers, 2);
243279
}
244280
}

etl-api/tests/pipelines.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,24 @@ async fn pipeline_config_can_be_updated() {
793793
.expect("failed to deserialize response");
794794
insta::assert_debug_snapshot!(response.config);
795795

796+
// Act
797+
let update_request = UpdatePipelineConfigRequest {
798+
config: partially_updated_optional_pipeline_config(
799+
ConfigUpdateType::TableErrorRetryMaxAttempts(12),
800+
),
801+
};
802+
let response = app
803+
.update_pipeline_config(&tenant_id, pipeline_id, &update_request)
804+
.await;
805+
806+
// Assert
807+
assert!(response.status().is_success());
808+
let response: UpdatePipelineConfigResponse = response
809+
.json()
810+
.await
811+
.expect("failed to deserialize response");
812+
insta::assert_debug_snapshot!(response.config);
813+
796814
// Act
797815
let update_request = UpdatePipelineConfigRequest {
798816
config: partially_updated_optional_pipeline_config(ConfigUpdateType::MaxTableSyncWorkers(

etl-api/tests/snapshots/destinations_pipelines__an_existing_destination_and_pipeline_can_be_updated-2.snap

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ FullApiPipelineConfig {
1717
table_error_retry_delay_ms: Some(
1818
20000,
1919
),
20+
table_error_retry_max_attempts: Some(
21+
10,
22+
),
2023
max_table_sync_workers: Some(
2124
4,
2225
),

etl-api/tests/snapshots/destinations_pipelines__destination_and_pipeline_can_be_created-2.snap

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ FullApiPipelineConfig {
1717
table_error_retry_delay_ms: Some(
1818
10000,
1919
),
20+
table_error_retry_max_attempts: Some(
21+
5,
22+
),
2023
max_table_sync_workers: Some(
2124
2,
2225
),

etl-api/tests/snapshots/pipelines__all_pipelines_can_be_read-2.snap

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ FullApiPipelineConfig {
1717
table_error_retry_delay_ms: Some(
1818
20000,
1919
),
20+
table_error_retry_max_attempts: Some(
21+
10,
22+
),
2023
max_table_sync_workers: Some(
2124
4,
2225
),

etl-api/tests/snapshots/pipelines__all_pipelines_can_be_read.snap

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ FullApiPipelineConfig {
1717
table_error_retry_delay_ms: Some(
1818
10000,
1919
),
20+
table_error_retry_max_attempts: Some(
21+
5,
22+
),
2023
max_table_sync_workers: Some(
2124
2,
2225
),

0 commit comments

Comments
 (0)