@@ -167,34 +167,29 @@ impl HotTierManager {
167
167
///get the hot tier metadata file for the stream
168
168
pub async fn get_hot_tier ( & self , stream : & str ) -> Result < StreamHotTier , HotTierError > {
169
169
if !self . check_stream_hot_tier_exists ( stream) {
170
- return Err ( HotTierError :: HotTierValidationError (
171
- HotTierValidationError :: NotFound ( stream. to_owned ( ) ) ,
172
- ) ) ;
170
+ return Err ( HotTierValidationError :: NotFound ( stream. to_owned ( ) ) . into ( ) ) ;
173
171
}
174
- let path = hot_tier_file_path ( self . hot_tier_path , stream) ?;
172
+ let path = self . hot_tier_file_path ( stream) ?;
175
173
let bytes = self
176
174
. filesystem
177
175
. get ( & path)
178
176
. and_then ( |resp| resp. bytes ( ) )
179
177
. await ?;
180
178
181
179
let mut stream_hot_tier: StreamHotTier = serde_json:: from_slice ( & bytes) ?;
182
- let oldest_date_time_entry = self . get_oldest_date_time_entry ( stream) . await ?;
183
- stream_hot_tier. oldest_date_time_entry = oldest_date_time_entry;
180
+ stream_hot_tier. oldest_date_time_entry = self . get_oldest_date_time_entry ( stream) . await ?;
184
181
185
182
Ok ( stream_hot_tier)
186
183
}
187
184
188
185
pub async fn delete_hot_tier ( & self , stream : & str ) -> Result < ( ) , HotTierError > {
189
- if self . check_stream_hot_tier_exists ( stream) {
190
- let path = self . hot_tier_path . join ( stream) ;
191
- fs:: remove_dir_all ( path) . await ?;
192
- Ok ( ( ) )
193
- } else {
194
- Err ( HotTierError :: HotTierValidationError (
195
- HotTierValidationError :: NotFound ( stream. to_owned ( ) ) ,
196
- ) )
186
+ if !self . check_stream_hot_tier_exists ( stream) {
187
+ return Err ( HotTierValidationError :: NotFound ( stream. to_owned ( ) ) . into ( ) ) ;
197
188
}
189
+ let path = self . hot_tier_path . join ( stream) ;
190
+ fs:: remove_dir_all ( path) . await ?;
191
+
192
+ Ok ( ( ) )
198
193
}
199
194
200
195
///put the hot tier metadata file for the stream
@@ -204,12 +199,26 @@ impl HotTierManager {
204
199
stream : & str ,
205
200
hot_tier : & mut StreamHotTier ,
206
201
) -> Result < ( ) , HotTierError > {
207
- let path = hot_tier_file_path ( self . hot_tier_path , stream) ?;
202
+ let path = self . hot_tier_file_path ( stream) ?;
208
203
let bytes = serde_json:: to_vec ( & hot_tier) ?. into ( ) ;
209
204
self . filesystem . put ( & path, bytes) . await ?;
210
205
Ok ( ( ) )
211
206
}
212
207
208
+ /// get the hot tier file path for the stream
209
+ pub fn hot_tier_file_path (
210
+ & self ,
211
+ stream : & str ,
212
+ ) -> Result < object_store:: path:: Path , HotTierError > {
213
+ let path = self
214
+ . hot_tier_path
215
+ . join ( stream)
216
+ . join ( STREAM_HOT_TIER_FILENAME ) ;
217
+ let path = object_store:: path:: Path :: from_absolute_path ( path) ?;
218
+
219
+ Ok ( path)
220
+ }
221
+
213
222
///schedule the download of the hot tier files from S3 every minute
214
223
pub fn download_from_s3 < ' a > ( & ' a self ) -> Result < ( ) , HotTierError >
215
224
where
@@ -584,7 +593,10 @@ impl HotTierManager {
584
593
fs:: write ( manifest_file. path ( ) , serde_json:: to_vec ( & manifest) ?) . await ?;
585
594
586
595
fs:: remove_dir_all ( path_to_delete. parent ( ) . unwrap ( ) ) . await ?;
587
- delete_empty_directory_hot_tier ( path_to_delete. parent ( ) . unwrap ( ) ) . await ?;
596
+ delete_empty_directory_hot_tier (
597
+ path_to_delete. parent ( ) . unwrap ( ) . to_path_buf ( ) ,
598
+ )
599
+ . await ?;
588
600
589
601
stream_hot_tier. used_size -= file_size;
590
602
stream_hot_tier. available_size += file_size;
@@ -721,48 +733,35 @@ impl HotTierManager {
721
733
}
722
734
}
723
735
724
- /// get the hot tier file path for the stream
725
- pub fn hot_tier_file_path (
726
- root : impl AsRef < std:: path:: Path > ,
727
- stream : & str ,
728
- ) -> Result < object_store:: path:: Path , object_store:: path:: Error > {
729
- let path = root. as_ref ( ) . join ( stream) . join ( STREAM_HOT_TIER_FILENAME ) ;
730
- object_store:: path:: Path :: from_absolute_path ( path)
731
- }
732
-
733
736
struct DiskUtil {
734
737
total_space : u64 ,
735
738
available_space : u64 ,
736
739
used_space : u64 ,
737
740
}
738
741
739
- async fn delete_empty_directory_hot_tier ( path : & Path ) -> io:: Result < ( ) > {
742
+ async fn delete_empty_directory_hot_tier ( path : PathBuf ) -> io:: Result < ( ) > {
740
743
if !path. is_dir ( ) {
741
744
return Ok ( ( ) ) ;
742
745
}
743
- let mut read_dir = fs:: read_dir ( path) . await ?;
744
- let mut subdirs = vec ! [ ] ;
746
+ let mut read_dir = fs:: read_dir ( & path) . await ?;
745
747
748
+ let mut tasks = vec ! [ ] ;
746
749
while let Some ( entry) = read_dir. next_entry ( ) . await ? {
747
750
let entry_path = entry. path ( ) ;
748
751
if entry_path. is_dir ( ) {
749
- subdirs . push ( entry_path) ;
752
+ tasks . push ( delete_empty_directory_hot_tier ( entry_path) ) ;
750
753
}
751
754
}
752
755
753
- let mut tasks = vec ! [ ] ;
754
- for subdir in & subdirs {
755
- tasks. push ( delete_empty_directory_hot_tier ( subdir) ) ;
756
- }
757
756
futures:: stream:: iter ( tasks)
758
757
. buffer_unordered ( 10 )
759
758
. try_collect :: < Vec < _ > > ( )
760
759
. await ?;
761
760
762
761
// Re-check the directory after deleting its subdirectories
763
- let mut read_dir = fs:: read_dir ( path) . await ?;
762
+ let mut read_dir = fs:: read_dir ( & path) . await ?;
764
763
if read_dir. next_entry ( ) . await ?. is_none ( ) {
765
- fs:: remove_dir ( path) . await ?;
764
+ fs:: remove_dir ( & path) . await ?;
766
765
}
767
766
768
767
Ok ( ( ) )
0 commit comments