Skip to content

Commit e1fd004

Browse files
committed
Remove TimedConsumer from parquet-concat since it needs to stricter invocation
After some experimentation with real-world scenarios, the parquet-concat Lambda cannot operate as effectively with "BUFFER MORE" semantics like the file-loader lambda can. Instead parquet-concat should be invoked with its 10 messages (batch size max) from the FIFO and use those in order to concat as quickly as possible and then exit. Since this doesn't have the same parallelism restrictions that file-loader does, since it's not appending to a Delta table, it's better to crank up concurrent executions to the extent possible. This change also utilizes the S3EventRecord file size when it is available to reduce the amount of S3 round-trips when reading the input parquet files. Signed-off-by: R. Tyler Croy <rtyler@buoyantdata.com>
1 parent cd54919 commit e1fd004

File tree

1 file changed

+17
-73
lines changed

1 file changed

+17
-73
lines changed

lambdas/parquet-concat/src/main.rs

Lines changed: 17 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,12 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
6060
let output_bucket = std::env::var("OUTPUT_BUCKET").expect("Failed to get `OUTPUT_BUCKET`");
6161
let input_prefix = std::env::var("INPUT_PREFIX").unwrap_or_default();
6262
debug!("Receiving event: {event:?}");
63-
let region = Region::new(GLOBAL);
6463
let mut records = extract_records_from(vec![event.payload])?;
65-
6664
info!("Processing {} bucket notifications", records.len());
67-
let fn_start_since_epoch_ms: u128 = SystemTime::now()
68-
.duration_since(UNIX_EPOCH)
69-
.expect("Failed to get a system time after unix epoch")
70-
.as_millis();
71-
// Millis to allow for consuming more messages
72-
let more_deadline_ms = ((event.context.deadline as u128) - fn_start_since_epoch_ms) / 2;
7365

7466
let mut last_writer: Option<AsyncArrowWriter<ParquetObjectWriter>> = None;
7567
let mut last_dir: Option<String> = None;
7668

77-
let config = aws_config::load_from_env().await;
78-
7969
// Input store to fetch parquet files from S3
8070
let input_store: Arc<dyn ObjectStore> = Arc::new(
8171
object_store::aws::AmazonS3Builder::from_env()
@@ -91,74 +81,24 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
9181
.build()
9282
.expect("Output object store failed to build"),
9383
);
94-
let consumer_config = ConsumerConfig {
95-
retrieval_max: 1,
96-
..Default::default()
97-
};
98-
info!("TimedConsumer configuration being used: {consumer_config:?}");
99-
100-
let mut consumer = TimedConsumer::new(
101-
consumer_config,
102-
&config,
103-
Duration::from_millis(
104-
more_deadline_ms
105-
.try_into()
106-
.expect("Failed to compute a 64-bit deadline"),
107-
),
108-
);
109-
110-
if let Ok(bytes_to_consume) = std::env::var("BUFFER_MORE_MBYTES_ALLOWED") {
111-
consumer.set_memory_limit(
112-
region,
113-
bytes_to_consume
114-
.parse::<usize>()
115-
.expect("BUFFER_MORE_MBYTES_ALLOWED must be parseable as a uint64")
116-
* 1024
117-
* 1024,
118-
);
119-
}
120-
121-
loop {
122-
let next_up = consumer.next().await?;
123-
124-
if let Some(batch) = next_up {
125-
info!(
126-
"Buffered an additional {} more messages from SQS",
127-
batch.len()
128-
);
129-
// When pulling messages separately, convert them to a Lambda SqsEvent like structure for
130-
// easier reuse of deserialization codec
131-
let event = SqsEvent {
132-
records: batch.into_iter().map(convert_from_sqs).collect(),
133-
};
134-
records.extend(extract_records_from(vec![event])?);
135-
}
136-
137-
if records.is_empty() {
138-
trace!("No more records to process at the moment, see ya later!");
139-
break;
140-
}
14184

142-
// Each iteration we'll drain the records to make sure everything is _gotten_ before
143-
// continuing
144-
process_records(
145-
records.drain(..),
146-
&input_store,
147-
&output_store,
148-
&output_prefix,
149-
&input_prefix,
150-
&mut last_writer,
151-
&mut last_dir,
152-
)
153-
.await?;
154-
}
85+
// Each iteration we'll drain the records to make sure everything is _gotten_ before
86+
// continuing
87+
process_records(
88+
records.drain(..),
89+
&input_store,
90+
&output_store,
91+
&output_prefix,
92+
&input_prefix,
93+
&mut last_writer,
94+
&mut last_dir,
95+
)
96+
.await?;
15597

15698
if let Some(writer) = last_writer {
15799
info!("Finalizing last writer");
158100
writer.close().await?;
159101
}
160-
debug!("Flushing consumer to wrap up the execution");
161-
consumer.flush().await?;
162102

163103
Ok(())
164104
}
@@ -178,7 +118,11 @@ async fn process_records(
178118
RecordType::Parquet => {
179119
debug!("Preparing to load file {file_record:?}");
180120
let location: Path = file_record.s3.object.url_decoded_key.unwrap().into();
181-
let object_reader = ParquetObjectReader::new(input_store.clone(), location.clone());
121+
let mut object_reader =
122+
ParquetObjectReader::new(input_store.clone(), location.clone());
123+
if let Some(file_size) = file_record.s3.object.size {
124+
object_reader = object_reader.with_file_size(file_size.try_into()?);
125+
}
182126
let mut reader = ParquetRecordBatchStreamBuilder::new(object_reader)
183127
.await?
184128
.with_batch_size(10_000)

0 commit comments

Comments
 (0)