Skip to content

Commit c2f6769

Browse files
author
Devdutt Shenoi
committed
refactor: drop to flush
1 parent fbd3a18 commit c2f6769

File tree

2 files changed

+16
-13
lines changed

2 files changed

+16
-13
lines changed

src/parseable/staging/writer.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
*/
1919

2020
use std::{
21-
collections::{HashMap, HashSet}, fs::{File, OpenOptions}, io::BufWriter, path::PathBuf, sync::Arc
21+
collections::{HashMap, HashSet},
22+
fs::{File, OpenOptions},
23+
io::BufWriter,
24+
path::PathBuf,
25+
sync::Arc,
2226
};
2327

2428
use arrow_array::RecordBatch;
@@ -70,6 +74,12 @@ impl DiskWriter {
7074
}
7175
}
7276

77+
impl Drop for DiskWriter {
78+
fn drop(&mut self) {
79+
self.finish();
80+
}
81+
}
82+
7383
/// Structure to keep recordbatches in memory.
7484
///
7585
/// Any new schema is updated in the schema map.

src/parseable/streams.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -325,18 +325,11 @@ impl Stream {
325325
}
326326

327327
pub fn flush(&self) {
328-
let mut disk_writers = {
329-
let mut writer = self.writer.lock().unwrap();
330-
// Flush memory
331-
writer.mem.clear();
332-
// Take schema -> disk writer mapping
333-
std::mem::take(&mut writer.disk)
334-
};
335-
336-
// Flush disk
337-
for writer in disk_writers.values_mut() {
338-
writer.finish();
339-
}
328+
let mut writer = self.writer.lock().unwrap();
329+
// Flush memory
330+
writer.mem.clear();
331+
// Drop DiskWirters to flush all streams in memory
332+
drop(std::mem::take(&mut writer.disk))
340333
}
341334

342335
fn parquet_writer_props(

0 commit comments

Comments
 (0)