@@ -60,8 +60,8 @@ pub const CURRENT_HOT_TIER_VERSION: &str = "v2";
60
60
pub struct StreamHotTier {
61
61
pub version : String ,
62
62
pub size : u64 ,
63
- pub used_size : Option < u64 > ,
64
- pub available_size : Option < u64 > ,
63
+ pub used_size : u64 ,
64
+ pub available_size : u64 ,
65
65
pub oldest_date_time_entry : Option < String > ,
66
66
}
67
67
@@ -99,7 +99,7 @@ impl HotTierManager {
99
99
if self . check_stream_hot_tier_exists ( & stream) && stream != current_stream {
100
100
let stream_hot_tier = self . get_hot_tier ( & stream) . await ?;
101
101
total_hot_tier_size += stream_hot_tier. size ;
102
- total_hot_tier_used_size += stream_hot_tier. used_size . as_ref ( ) . unwrap ( ) ;
102
+ total_hot_tier_used_size += stream_hot_tier. used_size ;
103
103
}
104
104
}
105
105
Ok ( ( total_hot_tier_size, total_hot_tier_used_size) )
@@ -118,7 +118,7 @@ impl HotTierManager {
118
118
if self . check_stream_hot_tier_exists ( stream) {
119
119
//delete existing hot tier if its size is less than the updated hot tier size else return error
120
120
let existing_hot_tier = self . get_hot_tier ( stream) . await ?;
121
- existing_hot_tier_used_size = * existing_hot_tier. used_size . as_ref ( ) . unwrap ( ) ;
121
+ existing_hot_tier_used_size = existing_hot_tier. used_size ;
122
122
123
123
if stream_hot_tier_size < existing_hot_tier_used_size {
124
124
return Err ( HotTierError :: ObjectStorageError ( ObjectStorageError :: Custom ( format ! (
@@ -188,11 +188,13 @@ impl HotTierManager {
188
188
used_size : stream_hot_tier
189
189
. used_size
190
190
. as_ref ( )
191
- . map ( |size| human_size_to_bytes ( size) . unwrap ( ) ) ,
191
+ . map ( |size| human_size_to_bytes ( size) . unwrap ( ) )
192
+ . unwrap_or_default ( ) ,
192
193
available_size : stream_hot_tier
193
194
. available_size
194
195
. as_ref ( )
195
- . map ( |size| human_size_to_bytes ( size) . unwrap ( ) ) ,
196
+ . map ( |size| human_size_to_bytes ( size) . unwrap ( ) )
197
+ . unwrap_or_default ( ) ,
196
198
oldest_date_time_entry : stream_hot_tier. oldest_date_time_entry ,
197
199
} ;
198
200
@@ -276,7 +278,7 @@ impl HotTierManager {
276
278
/// delete the files from the hot tier directory if the available date range is outside the hot tier range
277
279
async fn process_stream ( & self , stream : String ) -> Result < ( ) , HotTierError > {
278
280
let stream_hot_tier = self . get_hot_tier ( & stream) . await ?;
279
- let mut parquet_file_size = * stream_hot_tier. used_size . as_ref ( ) . unwrap ( ) ;
281
+ let mut parquet_file_size = stream_hot_tier. used_size ;
280
282
281
283
let object_store = CONFIG . storage ( ) . get_object_store ( ) ;
282
284
let mut s3_manifest_file_list = object_store. list_manifest_files ( & stream) . await ?;
@@ -368,7 +370,7 @@ impl HotTierManager {
368
370
let mut file_processed = false ;
369
371
let mut stream_hot_tier = self . get_hot_tier ( stream) . await ?;
370
372
if !self . is_disk_available ( parquet_file. file_size ) . await ?
371
- || stream_hot_tier. available_size . as_ref ( ) . unwrap ( ) <= & parquet_file. file_size
373
+ || stream_hot_tier. available_size <= parquet_file. file_size
372
374
{
373
375
if !self
374
376
. cleanup_hot_tier_old_data (
@@ -381,7 +383,7 @@ impl HotTierManager {
381
383
{
382
384
return Ok ( file_processed) ;
383
385
}
384
- * parquet_file_size = * stream_hot_tier. used_size . as_ref ( ) . unwrap ( ) ;
386
+ * parquet_file_size = stream_hot_tier. used_size ;
385
387
}
386
388
let parquet_file_path = RelativePathBuf :: from ( parquet_file. file_path . clone ( ) ) ;
387
389
fs:: create_dir_all ( parquet_path. parent ( ) . unwrap ( ) ) . await ?;
@@ -393,10 +395,9 @@ impl HotTierManager {
393
395
. await ?;
394
396
file. write_all ( & parquet_data) . await ?;
395
397
* parquet_file_size += parquet_file. file_size ;
396
- stream_hot_tier. used_size = Some ( * parquet_file_size) ;
398
+ stream_hot_tier. used_size = * parquet_file_size;
397
399
398
- stream_hot_tier. available_size =
399
- Some ( stream_hot_tier. available_size . as_ref ( ) . unwrap ( ) - parquet_file. file_size ) ;
400
+ stream_hot_tier. available_size -= parquet_file. file_size ;
400
401
self . put_hot_tier ( stream, & mut stream_hot_tier) . await ?;
401
402
file_processed = true ;
402
403
let mut hot_tier_manifest = self
@@ -606,14 +607,12 @@ impl HotTierManager {
606
607
fs:: remove_dir_all ( path_to_delete. parent ( ) . unwrap ( ) ) . await ?;
607
608
delete_empty_directory_hot_tier ( path_to_delete. parent ( ) . unwrap ( ) ) . await ?;
608
609
609
- stream_hot_tier. used_size =
610
- Some ( stream_hot_tier. used_size . as_ref ( ) . unwrap ( ) - file_size) ;
611
- stream_hot_tier. available_size =
612
- Some ( stream_hot_tier. available_size . as_ref ( ) . unwrap ( ) + file_size) ;
610
+ stream_hot_tier. used_size -= file_size;
611
+ stream_hot_tier. available_size += file_size;
613
612
self . put_hot_tier ( stream, stream_hot_tier) . await ?;
614
613
delete_successful = true ;
615
614
616
- if stream_hot_tier. available_size . as_ref ( ) . unwrap ( ) <= & parquet_file_size {
615
+ if stream_hot_tier. available_size <= parquet_file_size {
617
616
continue ' loop_files;
618
617
} else {
619
618
break ' loop_dates;
@@ -709,8 +708,8 @@ impl HotTierManager {
709
708
let mut stream_hot_tier = StreamHotTier {
710
709
version : CURRENT_HOT_TIER_VERSION . to_string ( ) ,
711
710
size : INTERNAL_STREAM_HOT_TIER_SIZE_BYTES ,
712
- used_size : Some ( 0 ) ,
713
- available_size : Some ( INTERNAL_STREAM_HOT_TIER_SIZE_BYTES ) ,
711
+ used_size : 0 ,
712
+ available_size : INTERNAL_STREAM_HOT_TIER_SIZE_BYTES ,
714
713
oldest_date_time_entry : None ,
715
714
} ;
716
715
self . put_hot_tier ( INTERNAL_STREAM_NAME , & mut stream_hot_tier)
0 commit comments