@@ -262,7 +262,7 @@ impl EventFormat for Event {
262
262
_ => p_timestamp. naive_utc ( ) ,
263
263
} ;
264
264
265
- let rb = Self :: into_recordbatch (
265
+ let batch = Self :: into_recordbatch (
266
266
p_timestamp,
267
267
vec ! [ json] ,
268
268
schema. clone ( ) ,
@@ -272,7 +272,7 @@ impl EventFormat for Event {
272
272
schema_version,
273
273
) ?;
274
274
275
- let schema = rb . schema ( ) ;
275
+ let schema = batch . schema ( ) ;
276
276
let mut key = get_schema_key ( & schema. fields ) ;
277
277
if time_partition. is_some ( ) {
278
278
let parsed_timestamp_to_min = parsed_timestamp. format ( "%Y%m%dT%H%M" ) . to_string ( ) ;
@@ -283,13 +283,21 @@ impl EventFormat for Event {
283
283
key. push_str ( & format ! ( "&{k}={v}" ) ) ;
284
284
}
285
285
286
- let entry = partitions. entry ( key) . or_insert ( PartitionEvent {
287
- rb : RecordBatch :: new_empty ( schema. clone ( ) ) ,
288
- parsed_timestamp,
289
- custom_partition_values,
290
- } ) ;
291
-
292
- entry. rb = concat_batches ( & schema, [ & entry. rb , & rb] ) ?;
286
+ match partitions. get_mut ( & key) {
287
+ Some ( PartitionEvent { rb, .. } ) => {
288
+ * rb = concat_batches ( & schema, [ & rb, & batch] ) ?;
289
+ }
290
+ _ => {
291
+ partitions. insert (
292
+ key,
293
+ PartitionEvent {
294
+ rb : batch,
295
+ parsed_timestamp,
296
+ custom_partition_values,
297
+ } ,
298
+ ) ;
299
+ }
300
+ }
293
301
}
294
302
295
303
Ok ( super :: Event {
0 commit comments