Skip to content

Commit 461d8b2

Browse files
committed
Add more robust retry configuration for parquet-concat
1 parent 25788a2 commit 461d8b2

File tree

3 files changed

+22
-12
lines changed

3 files changed

+22
-12
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
resolver = "3"
88

99
[workspace.package]
10-
version = "1.7.1"
10+
version = "1.7.2"
1111
edition = "2024"
1212
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
1313
homepage = "https://github.com/buoyant-data/oxbow"

lambdas/parquet-concat/README.adoc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,9 @@ toc::[]
3737
|
3838
| S3 prefix/path where consolidated files will be stored
3939

40-
| `BUFFER_MORE_QUEUE_URL`
41-
|
42-
| SQS queue URL for buffering additional messages beyond the initial Lambda trigger
43-
44-
| `BUFFER_MORE_MBYTES_ALLOWED`
45-
|
46-
| Memory limit in MB for processing additional messages (optional)
40+
| `MAX_RETRIES`
41+
| `20`
42+
| Maximum number of retries for interactions with S3
4743

4844
| `RUST_LOG`
4945
| `error`

lambdas/parquet-concat/src/main.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,26 @@
33
///
44
use aws_lambda_events::event::sqs::SqsEvent;
55
use aws_lambda_events::s3::S3EventRecord;
6-
use lambda_runtime::tracing::{debug, error, info, trace};
6+
use lambda_runtime::tracing::{debug, error, info};
77
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
8+
use object_store::{BackoffConfig, RetryConfig};
89
use parquet::arrow::async_reader::{
910
ParquetObjectReader, ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder,
1011
};
1112
use parquet::arrow::async_writer::{AsyncArrowWriter, ParquetObjectWriter};
1213
use parquet::basic::Compression;
1314
use parquet::file::properties::WriterProperties;
14-
use stats_alloc::{INSTRUMENTED_SYSTEM, Region, StatsAlloc};
15+
use stats_alloc::{INSTRUMENTED_SYSTEM, StatsAlloc};
1516

1617
use oxbow_lambda_shared::*;
17-
use oxbow_sqs::{ConsumerConfig, TimedConsumer};
1818
use tokio_stream::StreamExt;
1919

2020
use deltalake::{ObjectStore, Path};
2121
use std::alloc::System;
2222
use std::env;
2323
use std::iter::Iterator;
2424
use std::sync::Arc;
25-
use std::time::{Duration, SystemTime, UNIX_EPOCH};
25+
use std::time::Duration;
2626
use uuid::Uuid;
2727

2828
/// The setting of the global allocator here with stats alloc is intended to allow the
@@ -66,10 +66,23 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
6666
let mut last_writer: Option<AsyncArrowWriter<ParquetObjectWriter>> = None;
6767
let mut last_dir: Option<String> = None;
6868

69+
let max_retries: usize = std::env::var("MAX_RETRIES").unwrap_or("20".to_string()).parse()?;
70+
let init_backoff_ms: u64 = std::env::var("INIT_BACKOFF_MS").unwrap_or("500".to_string()).parse()?;
71+
72+
let retry = RetryConfig {
73+
backoff: BackoffConfig {
74+
init_backoff: Duration::from_millis(init_backoff_ms),
75+
..Default::default()
76+
},
77+
max_retries,
78+
..Default::default()
79+
};
80+
6981
// Input store to fetch parquet files from S3
7082
let input_store: Arc<dyn ObjectStore> = Arc::new(
7183
object_store::aws::AmazonS3Builder::from_env()
7284
.with_bucket_name(&input_bucket)
85+
.with_retry(retry.clone())
7386
.build()
7487
.expect("Input object store failed to build"),
7588
);
@@ -78,6 +91,7 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
7891
let output_store: Arc<dyn ObjectStore> = Arc::new(
7992
object_store::aws::AmazonS3Builder::from_env()
8093
.with_bucket_name(&output_bucket)
94+
.with_retry(retry)
8195
.build()
8296
.expect("Output object store failed to build"),
8397
);

0 commit comments

Comments
 (0)