@@ -70,6 +70,20 @@ static ARROWS_NAME_STRUCTURE: Lazy<Regex> = Lazy::new(|| {
70
70
Regex :: new ( r"^[[:alnum:]]+\.(?P<front>\S+)\.\d+\.data\.arrows$" ) . expect ( "Validated regex" )
71
71
} ) ;
72
72
73
+ fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> Option < PathBuf > {
74
+ let filename = path. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
75
+ let filename = ARROWS_NAME_STRUCTURE
76
+ . captures ( filename)
77
+ . and_then ( |c| c. get ( 1 ) ) ?
78
+ . as_str ( ) ;
79
+ let filename_with_random_number = format ! ( "{filename}.data.{random_string}.arrows" ) ;
80
+ let mut parquet_path = path. to_owned ( ) ;
81
+ parquet_path. set_file_name ( filename_with_random_number) ;
82
+ parquet_path. set_extension ( "parquet" ) ;
83
+
84
+ Some ( parquet_path)
85
+ }
86
+
73
87
#[ derive( Debug , thiserror:: Error ) ]
74
88
#[ error( "Stream not found: {0}" ) ]
75
89
pub struct StreamNotFound ( pub String ) ;
@@ -222,12 +236,13 @@ impl Stream {
222
236
& arrow_file_path, self . stream_name
223
237
) ;
224
238
remove_file ( & arrow_file_path) . unwrap ( ) ;
225
- } else {
226
- let key = Self :: arrow_path_to_parquet ( & arrow_file_path, & random_string) ;
239
+ } else if let Some ( key) = arrow_path_to_parquet ( & arrow_file_path, & random_string) {
227
240
grouped_arrow_file
228
241
. entry ( key)
229
242
. or_default ( )
230
243
. push ( arrow_file_path) ;
244
+ } else {
245
+ warn ! ( "Unexpected arrows file: {}" , arrow_file_path. display( ) ) ;
231
246
}
232
247
}
233
248
grouped_arrow_file
@@ -286,21 +301,6 @@ impl Stream {
286
301
}
287
302
}
288
303
289
- fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> PathBuf {
290
- let filename = path. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
291
- let filename = ARROWS_NAME_STRUCTURE
292
- . captures ( filename)
293
- . unwrap ( )
294
- . get ( 1 )
295
- . unwrap ( )
296
- . as_str ( ) ;
297
- let filename_with_random_number = format ! ( "{filename}.data.{random_string}.arrows" ) ;
298
- let mut parquet_path = path. to_owned ( ) ;
299
- parquet_path. set_file_name ( filename_with_random_number) ;
300
- parquet_path. set_extension ( "parquet" ) ;
301
- parquet_path
302
- }
303
-
304
304
/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
305
305
pub fn prepare_parquet ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
306
306
info ! (
0 commit comments