@@ -27,7 +27,10 @@ use crate::{
27
27
catalog:: manifest:: { File , Manifest } ,
28
28
handlers:: http:: cluster:: INTERNAL_STREAM_NAME ,
29
29
metadata:: { error:: stream_info:: MetadataError , STREAM_INFO } ,
30
- option:: { validation:: bytes_to_human_size, CONFIG } ,
30
+ option:: {
31
+ validation:: { bytes_to_human_size, human_size_to_bytes} ,
32
+ CONFIG ,
33
+ } ,
31
34
storage:: { ObjectStorage , ObjectStorageError } ,
32
35
utils:: extract_datetime,
33
36
validator:: error:: HotTierValidationError ,
@@ -40,6 +43,7 @@ use object_store::{local::LocalFileSystem, ObjectStore};
40
43
use once_cell:: sync:: OnceCell ;
41
44
use parquet:: errors:: ParquetError ;
42
45
use relative_path:: RelativePathBuf ;
46
+ use serde:: { Deserialize , Serialize } ;
43
47
use std:: time:: Duration ;
44
48
use sysinfo:: { Disks , System } ;
45
49
use tokio:: fs:: { self , DirEntry } ;
@@ -51,15 +55,16 @@ pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB
51
55
const HOT_TIER_SYNC_DURATION : Interval = clokwerk:: Interval :: Minutes ( 1 ) ;
52
56
pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES : u64 = 10485760 ; //10 MiB
53
57
pub const CURRENT_HOT_TIER_VERSION : & str = "v2" ;
54
- #[ derive( Debug , serde:: Deserialize , serde:: Serialize ) ]
58
+
59
+ #[ derive( Debug , Deserialize , Serialize , Default ) ]
55
60
pub struct StreamHotTier {
56
- pub version : Option < String > ,
61
+ pub version : String ,
57
62
#[ serde( rename = "size" ) ]
58
- pub size : String ,
63
+ pub size : u64 ,
59
64
#[ serde( skip_serializing_if = "Option::is_none" ) ]
60
- pub used_size : Option < String > ,
65
+ pub used_size : Option < u64 > ,
61
66
#[ serde( skip_serializing_if = "Option::is_none" ) ]
62
- pub available_size : Option < String > ,
67
+ pub available_size : Option < u64 > ,
63
68
#[ serde( skip_serializing_if = "Option::is_none" ) ]
64
69
pub oldest_date_time_entry : Option < String > ,
65
70
}
@@ -97,13 +102,8 @@ impl HotTierManager {
97
102
for stream in STREAM_INFO . list_streams ( ) {
98
103
if self . check_stream_hot_tier_exists ( & stream) && stream != current_stream {
99
104
let stream_hot_tier = self . get_hot_tier ( & stream) . await ?;
100
- total_hot_tier_size += & stream_hot_tier. size . parse :: < u64 > ( ) . unwrap ( ) ;
101
- total_hot_tier_used_size += & stream_hot_tier
102
- . used_size
103
- . clone ( )
104
- . unwrap ( )
105
- . parse :: < u64 > ( )
106
- . unwrap ( ) ;
105
+ total_hot_tier_size += stream_hot_tier. size ;
106
+ total_hot_tier_used_size += stream_hot_tier. used_size . as_ref ( ) . unwrap ( ) ;
107
107
}
108
108
}
109
109
Ok ( ( total_hot_tier_size, total_hot_tier_used_size) )
@@ -116,15 +116,13 @@ impl HotTierManager {
116
116
pub async fn validate_hot_tier_size (
117
117
& self ,
118
118
stream : & str ,
119
- stream_hot_tier_size : & str ,
119
+ stream_hot_tier_size : u64 ,
120
120
) -> Result < u64 , HotTierError > {
121
- let stream_hot_tier_size = stream_hot_tier_size. parse :: < u64 > ( ) . unwrap ( ) ;
122
121
let mut existing_hot_tier_used_size = 0 ;
123
122
if self . check_stream_hot_tier_exists ( stream) {
124
123
//delete existing hot tier if its size is less than the updated hot tier size else return error
125
124
let existing_hot_tier = self . get_hot_tier ( stream) . await ?;
126
- existing_hot_tier_used_size =
127
- existing_hot_tier. used_size . unwrap ( ) . parse :: < u64 > ( ) . unwrap ( ) ;
125
+ existing_hot_tier_used_size = * existing_hot_tier. used_size . as_ref ( ) . unwrap ( ) ;
128
126
129
127
if stream_hot_tier_size < existing_hot_tier_used_size {
130
128
return Err ( HotTierError :: ObjectStorageError ( ObjectStorageError :: Custom ( format ! (
@@ -170,20 +168,42 @@ impl HotTierManager {
170
168
) ) ;
171
169
}
172
170
let path = hot_tier_file_path ( & self . hot_tier_path , stream) ?;
173
- let res = self
171
+ let bytes = self
174
172
. filesystem
175
173
. get ( & path)
176
174
. and_then ( |resp| resp. bytes ( ) )
177
- . await ;
178
- match res {
179
- Ok ( bytes ) => {
180
- let mut stream_hot_tier : StreamHotTier = serde_json :: from_slice ( & bytes ) ? ;
181
- let oldest_date_time_entry = self . get_oldest_date_time_entry ( stream ) . await ? ;
182
- stream_hot_tier . oldest_date_time_entry = oldest_date_time_entry ;
183
- Ok ( stream_hot_tier )
184
- }
185
- Err ( err ) => Err ( err . into ( ) ) ,
175
+ . await ? ;
176
+
177
+ # [ derive ( Debug , Default , Deserialize ) ]
178
+ struct StreamHotTierTemp {
179
+ pub version : Option < String > ,
180
+ pub size : String ,
181
+ pub used_size : Option < String > ,
182
+ pub available_size : Option < String > ,
183
+ pub oldest_date_time_entry : Option < String > ,
186
184
}
185
+
186
+ let stream_hot_tier: StreamHotTierTemp = serde_json:: from_slice ( & bytes) ?;
187
+ let mut stream_hot_tier = StreamHotTier {
188
+ version : stream_hot_tier
189
+ . version
190
+ . unwrap_or_else ( || CURRENT_HOT_TIER_VERSION . to_owned ( ) ) ,
191
+ size : human_size_to_bytes ( & stream_hot_tier. size ) . unwrap ( ) ,
192
+ used_size : stream_hot_tier
193
+ . used_size
194
+ . as_ref ( )
195
+ . map ( |size| human_size_to_bytes ( size) . unwrap ( ) ) ,
196
+ available_size : stream_hot_tier
197
+ . available_size
198
+ . as_ref ( )
199
+ . map ( |size| human_size_to_bytes ( size) . unwrap ( ) ) ,
200
+ oldest_date_time_entry : stream_hot_tier. oldest_date_time_entry ,
201
+ } ;
202
+
203
+ let oldest_date_time_entry = self . get_oldest_date_time_entry ( stream) . await ?;
204
+ stream_hot_tier. oldest_date_time_entry = oldest_date_time_entry;
205
+
206
+ Ok ( stream_hot_tier)
187
207
}
188
208
189
209
pub async fn delete_hot_tier ( & self , stream : & str ) -> Result < ( ) , HotTierError > {
@@ -260,12 +280,7 @@ impl HotTierManager {
260
280
/// delete the files from the hot tier directory if the available date range is outside the hot tier range
261
281
async fn process_stream ( & self , stream : String ) -> Result < ( ) , HotTierError > {
262
282
let stream_hot_tier = self . get_hot_tier ( & stream) . await ?;
263
- let mut parquet_file_size = stream_hot_tier
264
- . used_size
265
- . as_ref ( )
266
- . unwrap ( )
267
- . parse :: < u64 > ( )
268
- . unwrap ( ) ;
283
+ let mut parquet_file_size = * stream_hot_tier. used_size . as_ref ( ) . unwrap ( ) ;
269
284
270
285
let object_store = CONFIG . storage ( ) . get_object_store ( ) ;
271
286
let mut s3_manifest_file_list = object_store. list_manifest_files ( & stream) . await ?;
@@ -357,13 +372,7 @@ impl HotTierManager {
357
372
let mut file_processed = false ;
358
373
let mut stream_hot_tier = self . get_hot_tier ( stream) . await ?;
359
374
if !self . is_disk_available ( parquet_file. file_size ) . await ?
360
- || stream_hot_tier
361
- . available_size
362
- . as_ref ( )
363
- . unwrap ( )
364
- . parse :: < u64 > ( )
365
- . unwrap ( )
366
- <= parquet_file. file_size
375
+ || stream_hot_tier. available_size . as_ref ( ) . unwrap ( ) <= & parquet_file. file_size
367
376
{
368
377
if !self
369
378
. cleanup_hot_tier_old_data (
@@ -376,12 +385,7 @@ impl HotTierManager {
376
385
{
377
386
return Ok ( file_processed) ;
378
387
}
379
- * parquet_file_size = stream_hot_tier
380
- . used_size
381
- . as_ref ( )
382
- . unwrap ( )
383
- . parse :: < u64 > ( )
384
- . unwrap ( ) ;
388
+ * parquet_file_size = * stream_hot_tier. used_size . as_ref ( ) . unwrap ( ) ;
385
389
}
386
390
let parquet_file_path = RelativePathBuf :: from ( parquet_file. file_path . clone ( ) ) ;
387
391
fs:: create_dir_all ( parquet_path. parent ( ) . unwrap ( ) ) . await ?;
@@ -393,18 +397,10 @@ impl HotTierManager {
393
397
. await ?;
394
398
file. write_all ( & parquet_data) . await ?;
395
399
* parquet_file_size += parquet_file. file_size ;
396
- stream_hot_tier. used_size = Some ( parquet_file_size. to_string ( ) ) ;
400
+ stream_hot_tier. used_size = Some ( * parquet_file_size) ;
397
401
398
- stream_hot_tier. available_size = Some (
399
- ( stream_hot_tier
400
- . available_size
401
- . as_ref ( )
402
- . unwrap ( )
403
- . parse :: < u64 > ( )
404
- . unwrap ( )
405
- - parquet_file. file_size )
406
- . to_string ( ) ,
407
- ) ;
402
+ stream_hot_tier. available_size =
403
+ Some ( stream_hot_tier. available_size . as_ref ( ) . unwrap ( ) - parquet_file. file_size ) ;
408
404
self . put_hot_tier ( stream, & mut stream_hot_tier) . await ?;
409
405
file_processed = true ;
410
406
let mut hot_tier_manifest = self
@@ -614,37 +610,14 @@ impl HotTierManager {
614
610
fs:: remove_dir_all ( path_to_delete. parent ( ) . unwrap ( ) ) . await ?;
615
611
delete_empty_directory_hot_tier ( path_to_delete. parent ( ) . unwrap ( ) ) . await ?;
616
612
617
- stream_hot_tier. used_size = Some (
618
- ( stream_hot_tier
619
- . used_size
620
- . as_ref ( )
621
- . unwrap ( )
622
- . parse :: < u64 > ( )
623
- . unwrap ( )
624
- - file_size)
625
- . to_string ( ) ,
626
- ) ;
627
- stream_hot_tier. available_size = Some (
628
- ( stream_hot_tier
629
- . available_size
630
- . as_ref ( )
631
- . unwrap ( )
632
- . parse :: < u64 > ( )
633
- . unwrap ( )
634
- + file_size)
635
- . to_string ( ) ,
636
- ) ;
613
+ stream_hot_tier. used_size =
614
+ Some ( stream_hot_tier. used_size . as_ref ( ) . unwrap ( ) - file_size) ;
615
+ stream_hot_tier. available_size =
616
+ Some ( stream_hot_tier. available_size . as_ref ( ) . unwrap ( ) + file_size) ;
637
617
self . put_hot_tier ( stream, stream_hot_tier) . await ?;
638
618
delete_successful = true ;
639
619
640
- if stream_hot_tier
641
- . available_size
642
- . as_ref ( )
643
- . unwrap ( )
644
- . parse :: < u64 > ( )
645
- . unwrap ( )
646
- <= parquet_file_size
647
- {
620
+ if stream_hot_tier. available_size . as_ref ( ) . unwrap ( ) <= & parquet_file_size {
648
621
continue ' loop_files;
649
622
} else {
650
623
break ' loop_dates;
@@ -738,10 +711,10 @@ impl HotTierManager {
738
711
&& !self . check_stream_hot_tier_exists ( INTERNAL_STREAM_NAME )
739
712
{
740
713
let mut stream_hot_tier = StreamHotTier {
741
- version : Some ( CURRENT_HOT_TIER_VERSION . to_string ( ) ) ,
742
- size : INTERNAL_STREAM_HOT_TIER_SIZE_BYTES . to_string ( ) ,
743
- used_size : Some ( "0" . to_string ( ) ) ,
744
- available_size : Some ( INTERNAL_STREAM_HOT_TIER_SIZE_BYTES . to_string ( ) ) ,
714
+ version : CURRENT_HOT_TIER_VERSION . to_string ( ) ,
715
+ size : INTERNAL_STREAM_HOT_TIER_SIZE_BYTES ,
716
+ used_size : Some ( 0 ) ,
717
+ available_size : Some ( INTERNAL_STREAM_HOT_TIER_SIZE_BYTES ) ,
745
718
oldest_date_time_entry : None ,
746
719
} ;
747
720
self . put_hot_tier ( INTERNAL_STREAM_NAME , & mut stream_hot_tier)
0 commit comments