Skip to content

Commit da78e06

Browse files
nikhilsinhaparseableDevdutt Shenoi
andauthored
fix: ignore invalid/corrupt staging files (#990)
* fix: ignore invalid/corrupt staging files issue: if one arrow file is present in staging directory of an ingestor, the server panics in the sync flow and does not sync the other files to the backend store this is a two fold problem - 1. find the reason behind corruption of an arrow file 2. discard the invalid/corrupt arrow file and let server sync the other files by notifying the name or the path of the corrupt file for further analysis This PR solves problem 2 where server notifies the path of the corrupt file name and ignores the file so that other files do not get stuck in the staging directory --------- Co-authored-by: Devdutt Shenoi <[email protected]>
1 parent 7be5d7e commit da78e06

File tree

3 files changed

+22
-7
lines changed

3 files changed

+22
-7
lines changed

server/src/metadata.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,9 +410,12 @@ impl StreamInfo {
410410

411411
fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Schema {
412412
let staging_files = StorageDir::new(stream_name).arrow_files();
413-
let schema = MergedRecordReader::try_new(&staging_files)
414-
.unwrap()
415-
.merged_schema();
413+
let record_reader = MergedRecordReader::try_new(&staging_files).unwrap();
414+
if record_reader.readers.is_empty() {
415+
return current_schema;
416+
}
417+
418+
let schema = record_reader.merged_schema();
416419

417420
Schema::try_merge(vec![schema, current_schema]).unwrap()
418421
}

server/src/storage/staging.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,9 @@ pub fn convert_disk_files_to_parquet(
260260
}
261261

262262
let record_reader = MergedReverseRecordReader::try_new(&files).unwrap();
263+
if record_reader.readers.is_empty() {
264+
continue;
265+
}
263266
let merged_schema = record_reader.merged_schema();
264267
let mut index_time_partition: usize = 0;
265268
if let Some(time_partition) = time_partition.as_ref() {

server/src/utils/arrow/merged_reader.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,13 @@ impl MergedRecordReader {
4949
log::error!("Invalid file detected, removing it: {:?}", file);
5050
fs::remove_file(file).unwrap();
5151
} else {
52-
let reader = StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
53-
.map_err(|_| ())?;
52+
let Ok(reader) =
53+
StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
54+
else {
55+
log::error!("Invalid file detected, ignoring it: {:?}", file);
56+
continue;
57+
};
58+
5459
readers.push(reader);
5560
}
5661
}
@@ -77,9 +82,13 @@ impl MergedReverseRecordReader {
7782
pub fn try_new(files: &[PathBuf]) -> Result<Self, ()> {
7883
let mut readers = Vec::with_capacity(files.len());
7984
for file in files {
80-
let reader =
85+
let Ok(reader) =
8186
utils::arrow::reverse_reader::get_reverse_reader(File::open(file).unwrap())
82-
.map_err(|_| ())?;
87+
else {
88+
log::error!("Invalid file detected, ignoring it: {:?}", file);
89+
continue;
90+
};
91+
8392
readers.push(reader);
8493
}
8594

0 commit comments

Comments
 (0)