@@ -41,7 +41,7 @@ use once_cell::sync::OnceCell;
41
41
use parquet:: errors:: ParquetError ;
42
42
use relative_path:: RelativePathBuf ;
43
43
use std:: time:: Duration ;
44
- use sysinfo:: { Disks , System } ;
44
+ use sysinfo:: Disks ;
45
45
use tokio:: fs:: { self , DirEntry } ;
46
46
use tokio:: io:: AsyncWriteExt ;
47
47
use tokio_stream:: wrappers:: ReadDirStream ;
@@ -128,28 +128,31 @@ impl HotTierManager {
128
128
}
129
129
}
130
130
131
- let ( total_disk_space, available_disk_space, used_disk_space) = get_disk_usage ( ) ;
132
-
133
- if let ( Some ( total_disk_space) , _, Some ( used_disk_space) ) =
134
- ( total_disk_space, available_disk_space, used_disk_space)
135
- {
136
- let ( total_hot_tier_size, total_hot_tier_used_size) =
137
- self . get_hot_tiers_size ( stream) . await ?;
138
- let disk_threshold =
139
- ( CONFIG . parseable . max_disk_usage * total_disk_space as f64 ) / 100.0 ;
140
- let max_allowed_hot_tier_size = disk_threshold
141
- - total_hot_tier_size as f64
142
- - ( used_disk_space as f64
143
- - total_hot_tier_used_size as f64
144
- - existing_hot_tier_used_size as f64 ) ;
145
-
146
- if stream_hot_tier_size as f64 > max_allowed_hot_tier_size {
147
- error ! ( "disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}" ,
148
- bytes_to_human_size( disk_threshold as u64 ) , bytes_to_human_size( used_disk_space) , bytes_to_human_size( total_hot_tier_used_size) , bytes_to_human_size( existing_hot_tier_used_size) , bytes_to_human_size( total_hot_tier_size) ) ;
149
- return Err ( HotTierError :: ObjectStorageError ( ObjectStorageError :: Custom ( format ! (
150
- "{} is the total usable disk space for hot tier, cannot set a bigger value." , bytes_to_human_size( max_allowed_hot_tier_size as u64 )
151
- ) ) ) ) ;
152
- }
131
+ let DiskUtil {
132
+ total_space,
133
+ used_space,
134
+ ..
135
+ } = get_disk_usage ( ) . expect ( "Codepath should only be hit if hottier is enabled" ) ;
136
+
137
+ let ( total_hot_tier_size, total_hot_tier_used_size) =
138
+ self . get_hot_tiers_size ( stream) . await ?;
139
+ let disk_threshold = ( CONFIG . parseable . max_disk_usage * total_space as f64 ) / 100.0 ;
140
+ let max_allowed_hot_tier_size = disk_threshold
141
+ - total_hot_tier_size as f64
142
+ - ( used_space as f64
143
+ - total_hot_tier_used_size as f64
144
+ - existing_hot_tier_used_size as f64 ) ;
145
+
146
+ if stream_hot_tier_size as f64 > max_allowed_hot_tier_size {
147
+ error ! ( "disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}" ,
148
+ bytes_to_human_size( disk_threshold as u64 ) , bytes_to_human_size( used_space) , bytes_to_human_size( total_hot_tier_used_size) , bytes_to_human_size( existing_hot_tier_used_size) , bytes_to_human_size( total_hot_tier_size) ) ;
149
+
150
+ return Err ( HotTierError :: ObjectStorageError (
151
+ ObjectStorageError :: Custom ( format ! (
152
+ "{} is the total usable disk space for hot tier, cannot set a bigger value." ,
153
+ bytes_to_human_size( max_allowed_hot_tier_size as u64 )
154
+ ) ) ,
155
+ ) ) ;
153
156
}
154
157
155
158
Ok ( existing_hot_tier_used_size)
@@ -618,16 +621,17 @@ impl HotTierManager {
618
621
///check if the disk is available to download the parquet file
619
622
/// check if the disk usage is above the threshold
620
623
pub async fn is_disk_available ( & self , size_to_download : u64 ) -> Result < bool , HotTierError > {
621
- let ( total_disk_space, available_disk_space, used_disk_space) = get_disk_usage ( ) ;
622
-
623
- if let ( Some ( total_disk_space) , Some ( available_disk_space) , Some ( used_disk_space) ) =
624
- ( total_disk_space, available_disk_space, used_disk_space)
624
+ if let Some ( DiskUtil {
625
+ total_space,
626
+ available_space,
627
+ used_space,
628
+ } ) = get_disk_usage ( )
625
629
{
626
- if available_disk_space < size_to_download {
630
+ if available_space < size_to_download {
627
631
return Ok ( false ) ;
628
632
}
629
633
630
- if ( ( used_disk_space + size_to_download) as f64 * 100.0 / total_disk_space as f64 )
634
+ if ( ( used_space + size_to_download) as f64 * 100.0 / total_space as f64 )
631
635
> CONFIG . parseable . max_disk_usage
632
636
{
633
637
return Ok ( false ) ;
@@ -716,30 +720,39 @@ pub fn hot_tier_file_path(
716
720
object_store:: path:: Path :: from_absolute_path ( path)
717
721
}
718
722
719
- ///get the disk usage for the hot tier storage path
720
- pub fn get_disk_usage ( ) -> ( Option < u64 > , Option < u64 > , Option < u64 > ) {
721
- let mut sys = System :: new_all ( ) ;
722
- sys . refresh_all ( ) ;
723
- let path = CONFIG . parseable . hot_tier_storage_path . as_ref ( ) . unwrap ( ) ;
723
+ struct DiskUtil {
724
+ total_space : u64 ,
725
+ available_space : u64 ,
726
+ used_space : u64 ,
727
+ }
724
728
729
+ /// Get the disk usage for the hot tier storage path. If we have a three disk paritions
730
+ /// mounted as follows:
731
+ /// 1. /
732
+ /// 2. /home/parseable
733
+ /// 3. /home/example/ignore
734
+ ///
735
+ /// And parseable is running with `P_HOT_TIER_DIR` pointing to a directory in
736
+ /// `/home/parseable`, we should return the usage stats of the disk mounted there.
737
+ fn get_disk_usage ( ) -> Option < DiskUtil > {
738
+ let path = CONFIG . parseable . hot_tier_storage_path . as_ref ( ) ?;
725
739
let mut disks = Disks :: new_with_refreshed_list ( ) ;
740
+ // Order the disk partitions by decreasing length of mount path
726
741
disks. sort_by_key ( |disk| disk. mount_point ( ) . to_str ( ) . unwrap ( ) . len ( ) ) ;
727
742
disks. reverse ( ) ;
728
743
729
744
for disk in disks. iter ( ) {
730
- if path. starts_with ( disk. mount_point ( ) . to_str ( ) . unwrap ( ) ) {
731
- let total_disk_space = disk. total_space ( ) ;
732
- let available_disk_space = disk. available_space ( ) ;
733
- let used_disk_space = total_disk_space - available_disk_space;
734
- return (
735
- Some ( total_disk_space) ,
736
- Some ( available_disk_space) ,
737
- Some ( used_disk_space) ,
738
- ) ;
745
+ // Returns disk utilisation of first matching mount point
746
+ if path. starts_with ( disk. mount_point ( ) ) {
747
+ return Some ( DiskUtil {
748
+ total_space : disk. total_space ( ) ,
749
+ available_space : disk. available_space ( ) ,
750
+ used_space : disk. total_space ( ) - disk. available_space ( ) ,
751
+ } ) ;
739
752
}
740
753
}
741
754
742
- ( None , None , None )
755
+ None
743
756
}
744
757
745
758
async fn delete_empty_directory_hot_tier ( path : & Path ) -> io:: Result < ( ) > {
0 commit comments