Skip to content

Commit 7b1e9dd

Browse files
feat: allow configurable duration for data push to S3 (#626)
create parquet file by grouping all arrow files (in staging) for the duration provided in env variable P_STORAGE_UPLOAD_INTERVAL also check if arrow files vector is not empty, then sort the arrow files and create key for parquet file from last file from sorted arrow files vector Fixes #616
1 parent 5ca9ecb commit 7b1e9dd

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

server/src/storage/staging.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ impl StorageDir {
124124
// hashmap <time, vec[paths]> but exclude where hot filename matches
125125
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
126126
let mut arrow_files = self.arrow_files();
127+
127128
arrow_files.retain(|path| {
128129
!path
129130
.file_name()
@@ -132,12 +133,17 @@ impl StorageDir {
132133
.unwrap()
133134
.ends_with(&hot_filename)
134135
});
135-
for arrow_file_path in arrow_files {
136-
let key = Self::arrow_path_to_parquet(&arrow_file_path);
137-
grouped_arrow_file
138-
.entry(key)
139-
.or_default()
140-
.push(arrow_file_path);
136+
137+
//check if arrow files is not empty, fetch the parquet file path from last file from sorted arrow file list
138+
if !(arrow_files.is_empty()) {
139+
arrow_files.sort();
140+
let key = Self::arrow_path_to_parquet(arrow_files.last().unwrap());
141+
for arrow_file_path in arrow_files {
142+
grouped_arrow_file
143+
.entry(key.clone())
144+
.or_default()
145+
.push(arrow_file_path);
146+
}
141147
}
142148

143149
grouped_arrow_file
@@ -201,7 +207,6 @@ pub fn convert_disk_files_to_parquet(
201207
let record_reader = MergedReverseRecordReader::try_new(&files).unwrap();
202208

203209
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
204-
205210
let props = parquet_writer_props().build();
206211
let merged_schema = record_reader.merged_schema();
207212
schemas.push(merged_schema.clone());

0 commit comments

Comments
 (0)