Skip to content

Commit f21d965

Browse files
authored
Return empty parquet upload result when the buffer is empty that we d… (#478)
* Return empty parquet upload result when the buffer is empty that we didn't actually upload to avoid parquet gap detector to use so much memory to keep the map size big until there is an actual upload. * lint * d
1 parent b3875a3 commit f21d965

File tree

4 files changed

+40
-7
lines changed

4 files changed

+40
-7
lines changed

rust/processor/src/bq_analytics/generic_parquet_processor.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,23 @@ where
178178
// This is to cover the case when interval duration has passed but buffer is empty
179179
if self.buffer.is_empty() {
180180
debug!("Buffer is empty, skipping upload.");
181+
182+
let parquet_processing_result = ParquetProcessingResult {
183+
start_version: -1, // this is to indicate that nothing was actually uploaded
184+
end_version: -1,
185+
last_transaction_timestamp: None,
186+
txn_version_to_struct_count: None,
187+
parquet_processed_structs: None,
188+
table_name: ParquetType::TABLE_NAME.to_string(),
189+
};
190+
191+
self.gap_detector_sender
192+
.send(ProcessingResult::ParquetProcessingResult(
193+
parquet_processing_result,
194+
))
195+
.await
196+
.expect("[Parser] Failed to send versions to gap detector");
197+
181198
return Ok(());
182199
}
183200
let start_version = self

rust/processor/src/gap_detectors/parquet_gap_detector.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use crate::gap_detectors::{GapDetectorResult, GapDetectorTrait, ProcessingResult};
55
use ahash::{AHashMap, AHashSet};
6-
use anyhow::{Context, Result};
6+
use anyhow::Result;
77
use std::{
88
cmp::{max, min},
99
sync::{Arc, Mutex},
@@ -134,9 +134,24 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {
134134
ProcessingResult::ParquetProcessingResult(r) => r,
135135
_ => panic!("Invalid result type"),
136136
};
137-
let parquet_processed_structs = result
138-
.parquet_processed_structs
139-
.context("Missing parquet processed transactions")?;
137+
138+
let parquet_processed_structs = result.parquet_processed_structs.unwrap_or_else(|| {
139+
info!("Interval duration has passed, but there are no structs to process.");
140+
AHashMap::new()
141+
});
142+
143+
if result.start_version == -1 {
144+
// meaning we didn't really upload anything but we stil lwould like to update the map to reduce memory usage.
145+
self.update_next_version_to_process(self.max_version, &result.table_name);
146+
return Ok(GapDetectorResult::ParquetFileGapDetectorResult(
147+
ParquetFileGapDetectorResult {
148+
next_version_to_process: self.next_version_to_process as u64,
149+
num_gaps: (self.max_version - self.next_version_to_process) as u64,
150+
last_transaction_timestamp: result.last_transaction_timestamp,
151+
},
152+
));
153+
}
154+
140155
info!(
141156
start_version = result.start_version,
142157
end_version = result.end_version,

rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl Debug for ParquetAnsProcessor {
8282
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8383
write!(
8484
f,
85-
"ParquetAnsProcessor {{ capacity of trnasactions channel: {:?}}}",
85+
"ParquetAnsProcessor {{ capacity of ans_primary_name_v2 channel: {:?}}}",
8686
&self.ans_primary_name_v2_sender.capacity()
8787
)
8888
}

rust/processor/src/processors/parquet_processors/parquet_default_processor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,10 @@ pub fn process_transactions(
270270
for detail in wsc_details {
271271
match detail {
272272
WriteSetChangeDetail::Module(module) => {
273-
move_modules.push(module.clone());
273+
let txn_version = module.txn_version;
274+
move_modules.push(module);
274275
transaction_version_to_struct_count
275-
.entry(module.txn_version)
276+
.entry(txn_version)
276277
.and_modify(|e| *e += 1)
277278
.or_insert(1);
278279
},

0 commit comments

Comments
 (0)