@@ -280,18 +280,7 @@ impl EventFormat for Event {
280
280
_ => p_timestamp. naive_utc ( ) ,
281
281
} ;
282
282
283
- let prefix = format ! (
284
- "{}.{}.minute={}.{}" ,
285
- get_schema_key( & schema) ,
286
- parsed_timestamp. format( "date=%Y-%m-%d.hour=%H" ) ,
287
- Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
288
- custom_partition_values
289
- . iter( )
290
- . sorted_by_key( |v| v. 0 )
291
- . map( |( key, value) | format!( "{key}={value}" ) )
292
- . join( "." )
293
- ) ;
294
-
283
+ let prefix = generate_prefix ( & schema, parsed_timestamp, & custom_partition_values) ;
295
284
if let Some ( JsonPartition { batch, .. } ) = json_partitions. get_mut ( & prefix) {
296
285
batch. push ( json)
297
286
} else {
@@ -338,6 +327,24 @@ impl EventFormat for Event {
338
327
}
339
328
}
340
329
330
+ fn generate_prefix (
331
+ schema : & [ Arc < Field > ] ,
332
+ parsed_timestamp : NaiveDateTime ,
333
+ custom_partition_values : & HashMap < String , String > ,
334
+ ) -> String {
335
+ format ! (
336
+ "{}.{}.minute={}{}" ,
337
+ get_schema_key( schema) ,
338
+ parsed_timestamp. format( "date=%Y-%m-%d.hour=%H" ) ,
339
+ Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
340
+ custom_partition_values
341
+ . iter( )
342
+ . sorted_by_key( |v| v. 0 )
343
+ . map( |( key, value) | format!( ".{key}={value}" ) )
344
+ . join( "" )
345
+ )
346
+ }
347
+
341
348
/// Extracts custom partition values from provided JSON object
342
349
/// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}`
343
350
pub fn extract_custom_partition_values (
@@ -463,6 +470,7 @@ mod tests {
463
470
464
471
use arrow:: datatypes:: Int64Type ;
465
472
use arrow_array:: { ArrayRef , Float64Array , Int64Array , ListArray , StringArray } ;
473
+ use chrono:: Timelike ;
466
474
use serde_json:: json;
467
475
468
476
use super :: * ;
@@ -976,4 +984,51 @@ mod tests {
976
984
& Float64Array :: from( vec![ None , None , None , Some ( 2.0 ) ] )
977
985
) ;
978
986
}
987
+
988
+ #[ test]
989
+ fn generate_correct_prefix_with_current_time_and_no_custom_partitioning ( ) {
990
+ let schema = vec ! [ ] ;
991
+ let parsed_timestamp = NaiveDate :: from_ymd_opt ( 2023 , 10 , 1 )
992
+ . unwrap ( )
993
+ . and_hms_opt ( 12 , 30 , 0 )
994
+ . unwrap ( ) ;
995
+ let custom_partition_values = HashMap :: new ( ) ;
996
+
997
+ let expected = format ! (
998
+ "{}.date={}.hour={:02}.minute={}" ,
999
+ get_schema_key( & schema) ,
1000
+ parsed_timestamp. date( ) ,
1001
+ parsed_timestamp. hour( ) ,
1002
+ Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
1003
+ ) ;
1004
+
1005
+ let generated = generate_prefix ( & schema, parsed_timestamp, & custom_partition_values) ;
1006
+
1007
+ assert_eq ! ( generated, expected) ;
1008
+ }
1009
+
1010
+ #[ test]
1011
+ fn generate_correct_prefix_with_current_time_and_custom_partitioning ( ) {
1012
+ let schema = vec ! [ ] ;
1013
+ let parsed_timestamp = NaiveDate :: from_ymd_opt ( 2023 , 10 , 1 )
1014
+ . unwrap ( )
1015
+ . and_hms_opt ( 12 , 30 , 0 )
1016
+ . unwrap ( ) ;
1017
+ let custom_partition_values = HashMap :: from_iter ( [
1018
+ ( "key1" . to_string ( ) , "value1" . to_string ( ) ) ,
1019
+ ( "key2" . to_string ( ) , "value2" . to_string ( ) ) ,
1020
+ ] ) ;
1021
+
1022
+ let expected = format ! (
1023
+ "{}.date={}.hour={:02}.minute={}.key1=value1.key2=value2" ,
1024
+ get_schema_key( & schema) ,
1025
+ parsed_timestamp. date( ) ,
1026
+ parsed_timestamp. hour( ) ,
1027
+ Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
1028
+ ) ;
1029
+
1030
+ let generated = generate_prefix ( & schema, parsed_timestamp, & custom_partition_values) ;
1031
+
1032
+ assert_eq ! ( generated, expected) ;
1033
+ }
979
1034
}
0 commit comments