@@ -56,10 +56,8 @@ pub const CURRENT_HOT_TIER_VERSION: &str = "v2";
56
56
pub struct StreamHotTier {
57
57
pub version : Option < String > ,
58
58
pub size : String ,
59
- #[ serde( skip_serializing_if = "Option::is_none" ) ]
60
- pub used_size : Option < String > ,
61
- #[ serde( skip_serializing_if = "Option::is_none" ) ]
62
- pub available_size : Option < String > ,
59
+ pub used_size : String ,
60
+ pub available_size : String ,
63
61
#[ serde( skip_serializing_if = "Option::is_none" ) ]
64
62
pub oldest_date_time_entry : Option < String > ,
65
63
}
@@ -98,12 +96,7 @@ impl HotTierManager {
98
96
if self . check_stream_hot_tier_exists ( & stream) && stream != current_stream {
99
97
let stream_hot_tier = self . get_hot_tier ( & stream) . await ?;
100
98
total_hot_tier_size += & stream_hot_tier. size . parse :: < u64 > ( ) . unwrap ( ) ;
101
- total_hot_tier_used_size += & stream_hot_tier
102
- . used_size
103
- . clone ( )
104
- . unwrap ( )
105
- . parse :: < u64 > ( )
106
- . unwrap ( ) ;
99
+ total_hot_tier_used_size += stream_hot_tier. used_size . parse :: < u64 > ( ) . unwrap ( ) ;
107
100
}
108
101
}
109
102
Ok ( ( total_hot_tier_size, total_hot_tier_used_size) )
@@ -123,8 +116,7 @@ impl HotTierManager {
123
116
if self . check_stream_hot_tier_exists ( stream) {
124
117
//delete existing hot tier if its size is less than the updated hot tier size else return error
125
118
let existing_hot_tier = self . get_hot_tier ( stream) . await ?;
126
- existing_hot_tier_used_size =
127
- existing_hot_tier. used_size . unwrap ( ) . parse :: < u64 > ( ) . unwrap ( ) ;
119
+ existing_hot_tier_used_size = existing_hot_tier. used_size . parse :: < u64 > ( ) . unwrap ( ) ;
128
120
129
121
if stream_hot_tier_size < existing_hot_tier_used_size {
130
122
return Err ( HotTierError :: ObjectStorageError ( ObjectStorageError :: Custom ( format ! (
@@ -260,12 +252,7 @@ impl HotTierManager {
260
252
/// delete the files from the hot tier directory if the available date range is outside the hot tier range
261
253
async fn process_stream ( & self , stream : String ) -> Result < ( ) , HotTierError > {
262
254
let stream_hot_tier = self . get_hot_tier ( & stream) . await ?;
263
- let mut parquet_file_size = stream_hot_tier
264
- . used_size
265
- . as_ref ( )
266
- . unwrap ( )
267
- . parse :: < u64 > ( )
268
- . unwrap ( ) ;
255
+ let mut parquet_file_size = stream_hot_tier. used_size . parse :: < u64 > ( ) . unwrap ( ) ;
269
256
270
257
let object_store = CONFIG . storage ( ) . get_object_store ( ) ;
271
258
let mut s3_manifest_file_list = object_store. list_manifest_files ( & stream) . await ?;
@@ -357,13 +344,7 @@ impl HotTierManager {
357
344
let mut file_processed = false ;
358
345
let mut stream_hot_tier = self . get_hot_tier ( stream) . await ?;
359
346
if !self . is_disk_available ( parquet_file. file_size ) . await ?
360
- || stream_hot_tier
361
- . available_size
362
- . as_ref ( )
363
- . unwrap ( )
364
- . parse :: < u64 > ( )
365
- . unwrap ( )
366
- <= parquet_file. file_size
347
+ || stream_hot_tier. available_size . parse :: < u64 > ( ) . unwrap ( ) <= parquet_file. file_size
367
348
{
368
349
if !self
369
350
. cleanup_hot_tier_old_data (
@@ -376,12 +357,7 @@ impl HotTierManager {
376
357
{
377
358
return Ok ( file_processed) ;
378
359
}
379
- * parquet_file_size = stream_hot_tier
380
- . used_size
381
- . as_ref ( )
382
- . unwrap ( )
383
- . parse :: < u64 > ( )
384
- . unwrap ( ) ;
360
+ * parquet_file_size = stream_hot_tier. used_size . parse :: < u64 > ( ) . unwrap ( ) ;
385
361
}
386
362
let parquet_file_path = RelativePathBuf :: from ( parquet_file. file_path . clone ( ) ) ;
387
363
fs:: create_dir_all ( parquet_path. parent ( ) . unwrap ( ) ) . await ?;
@@ -393,18 +369,11 @@ impl HotTierManager {
393
369
. await ?;
394
370
file. write_all ( & parquet_data) . await ?;
395
371
* parquet_file_size += parquet_file. file_size ;
396
- stream_hot_tier. used_size = Some ( parquet_file_size. to_string ( ) ) ;
397
-
398
- stream_hot_tier. available_size = Some (
399
- ( stream_hot_tier
400
- . available_size
401
- . as_ref ( )
402
- . unwrap ( )
403
- . parse :: < u64 > ( )
404
- . unwrap ( )
405
- - parquet_file. file_size )
406
- . to_string ( ) ,
407
- ) ;
372
+ stream_hot_tier. used_size = parquet_file_size. to_string ( ) ;
373
+
374
+ stream_hot_tier. available_size = ( stream_hot_tier. available_size . parse :: < u64 > ( ) . unwrap ( )
375
+ - parquet_file. file_size )
376
+ . to_string ( ) ;
408
377
self . put_hot_tier ( stream, & mut stream_hot_tier) . await ?;
409
378
file_processed = true ;
410
379
let mut hot_tier_manifest = self
@@ -614,35 +583,16 @@ impl HotTierManager {
614
583
fs:: remove_dir_all ( path_to_delete. parent ( ) . unwrap ( ) ) . await ?;
615
584
delete_empty_directory_hot_tier ( path_to_delete. parent ( ) . unwrap ( ) ) . await ?;
616
585
617
- stream_hot_tier. used_size = Some (
618
- ( stream_hot_tier
619
- . used_size
620
- . as_ref ( )
621
- . unwrap ( )
622
- . parse :: < u64 > ( )
623
- . unwrap ( )
624
- - file_size)
625
- . to_string ( ) ,
626
- ) ;
627
- stream_hot_tier. available_size = Some (
628
- ( stream_hot_tier
629
- . available_size
630
- . as_ref ( )
631
- . unwrap ( )
632
- . parse :: < u64 > ( )
633
- . unwrap ( )
634
- + file_size)
635
- . to_string ( ) ,
636
- ) ;
586
+ stream_hot_tier. used_size =
587
+ ( stream_hot_tier. used_size . parse :: < u64 > ( ) . unwrap ( ) - file_size)
588
+ . to_string ( ) ;
589
+ stream_hot_tier. available_size =
590
+ ( stream_hot_tier. available_size . parse :: < u64 > ( ) . unwrap ( ) + file_size)
591
+ . to_string ( ) ;
637
592
self . put_hot_tier ( stream, stream_hot_tier) . await ?;
638
593
delete_successful = true ;
639
594
640
- if stream_hot_tier
641
- . available_size
642
- . as_ref ( )
643
- . unwrap ( )
644
- . parse :: < u64 > ( )
645
- . unwrap ( )
595
+ if stream_hot_tier. available_size . parse :: < u64 > ( ) . unwrap ( )
646
596
<= parquet_file_size
647
597
{
648
598
continue ' loop_files;
@@ -740,8 +690,8 @@ impl HotTierManager {
740
690
let mut stream_hot_tier = StreamHotTier {
741
691
version : Some ( CURRENT_HOT_TIER_VERSION . to_string ( ) ) ,
742
692
size : INTERNAL_STREAM_HOT_TIER_SIZE_BYTES . to_string ( ) ,
743
- used_size : Some ( "0" . to_string ( ) ) ,
744
- available_size : Some ( INTERNAL_STREAM_HOT_TIER_SIZE_BYTES . to_string ( ) ) ,
693
+ used_size : "0" . to_string ( ) ,
694
+ available_size : INTERNAL_STREAM_HOT_TIER_SIZE_BYTES . to_string ( ) ,
745
695
oldest_date_time_entry : None ,
746
696
} ;
747
697
self . put_hot_tier ( INTERNAL_STREAM_NAME , & mut stream_hot_tier)
0 commit comments