File tree Expand file tree Collapse file tree 1 file changed +11
-3
lines changed Expand file tree Collapse file tree 1 file changed +11
-3
lines changed Original file line number Diff line number Diff line change @@ -325,8 +325,7 @@ impl Stream {
325
325
// if yes, then merge them and save
326
326
327
327
if let Some ( mut schema) = schema {
328
- let static_schema_flag = self . get_static_schema_flag ( ) ;
329
- if !static_schema_flag {
328
+ if !self . get_static_schema_flag ( ) {
330
329
// schema is dynamic, read from staging and merge if present
331
330
332
331
// need to add something before .schema to make the file have an extension of type `schema`
@@ -477,7 +476,16 @@ impl Stream {
477
476
. set ( arrow_files. len ( ) as i64 ) ;
478
477
479
478
for file in & arrow_files {
480
- let file_size = file. metadata ( ) . unwrap ( ) . len ( ) ;
479
+ let file_size = match file. metadata ( ) {
480
+ Ok ( meta) => meta. len ( ) ,
481
+ Err ( err) => {
482
+ error ! (
483
+ "Looks like the file ({}) was removed; Error = {err}" ,
484
+ file. display( )
485
+ ) ;
486
+ continue ;
487
+ }
488
+ } ;
481
489
482
490
metrics:: STORAGE_SIZE
483
491
. with_label_values ( & [ "staging" , & self . stream_name , ARROW_FILE_EXTENSION ] )
You can’t perform that action at this time.
0 commit comments