Skip to content

Commit 4c8aa57

Browse files
committed
made the value buffer optional and made the initialization on read to avoid allocate one extra buffer at the end
1 parent 49b3244 commit 4c8aa57

File tree

1 file changed

+15
-7
lines changed
  • parquet/src/arrow/record_reader

1 file changed

+15
-7
lines changed

parquet/src/arrow/record_reader/mod.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ pub(crate) type ColumnReader<CV> =
5050
pub struct GenericRecordReader<V, CV> {
5151
column_desc: ColumnDescPtr,
5252

53-
values: V,
53+
/// Values buffer, lazily initialized on first read to avoid
54+
/// allocating a buffer that may never be used (e.g., after the last batch)
55+
values: Option<V>,
5456
def_levels: Option<DefinitionLevelBuffer>,
5557
rep_levels: Option<Vec<i16>>,
5658
column_reader: Option<ColumnReader<CV>>,
@@ -79,7 +81,7 @@ where
7981
let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
8082

8183
Self {
82-
values: V::with_capacity(capacity),
84+
values: None, // Lazily initialized on first read
8385
def_levels,
8486
rep_levels,
8587
column_reader: None,
@@ -176,9 +178,9 @@ where
176178
/// Returns currently stored buffer data.
177179
/// The side effect is similar to `consume_def_levels`.
178180
pub fn consume_record_data(&mut self) -> V {
179-
// Replace the buffer with a new one that has the same capacity
180-
// This avoids reallocations on subsequent batches
181-
std::mem::replace(&mut self.values, V::with_capacity(self.capacity_hint))
181+
// Take the buffer, leaving None. The next read will lazily allocate a new buffer.
182+
// This avoids allocating a buffer that may never be used (e.g., after the last batch).
183+
self.values.take().unwrap_or_else(|| V::with_capacity(0))
182184
}
183185

184186
/// Returns currently stored null bitmap data for nullable columns.
@@ -222,20 +224,26 @@ where
222224
self.capacity_hint = batch_size;
223225
}
224226

227+
// Lazily initialize buffer on first read
228+
let capacity_hint = self.capacity_hint;
229+
let values = self
230+
.values
231+
.get_or_insert_with(|| V::with_capacity(capacity_hint));
232+
225233
let (records_read, values_read, levels_read) =
226234
self.column_reader.as_mut().unwrap().read_records(
227235
batch_size,
228236
self.def_levels.as_mut(),
229237
self.rep_levels.as_mut(),
230-
&mut self.values,
238+
values,
231239
)?;
232240

233241
if values_read < levels_read {
234242
let def_levels = self.def_levels.as_ref().ok_or_else(|| {
235243
general_err!("Definition levels should exist when data is less than levels!")
236244
})?;
237245

238-
self.values.pad_nulls(
246+
values.pad_nulls(
239247
self.num_values,
240248
values_read,
241249
levels_read,

0 commit comments

Comments
 (0)