@@ -783,6 +783,22 @@ pub struct FsStorageConfig {
783783 default_value = "_data"
784784 ) ]
785785 pub data_path : String ,
786+
787+ /// Percentage of disk space to reserve (0.0-100.0) - for spill usage
788+ #[ clap( skip) ]
789+ #[ serde( default = "FsStorageConfig::default_reserved_space_percentage" ) ]
790+ pub reserved_space_percentage : Option < OrderedFloat < f64 > > ,
791+
792+ /// Maximum bytes allowed for spill (0 = unlimited) - for spill usage
793+ #[ clap( skip) ]
794+ #[ serde( default ) ]
795+ pub max_bytes : Option < u64 > ,
796+ }
797+
798+ impl FsStorageConfig {
799+ fn default_reserved_space_percentage ( ) -> Option < OrderedFloat < f64 > > {
800+ None // Use None as default, will use system default (30.0) if not specified
801+ }
786802}
787803
788804impl Default for FsStorageConfig {
@@ -795,6 +811,8 @@ impl From<InnerStorageFsConfig> for FsStorageConfig {
795811 fn from ( inner : InnerStorageFsConfig ) -> Self {
796812 Self {
797813 data_path : inner. root ,
814+ reserved_space_percentage : None ,
815+ max_bytes : None ,
798816 }
799817 }
800818}
@@ -3533,6 +3551,17 @@ impl Default for DiskCacheConfig {
35333551#[ derive( Clone , Debug , PartialEq , Eq , Serialize , Deserialize , Args ) ]
35343552#[ serde( default ) ]
35353553pub struct SpillConfig {
3554+ /// Unified spill storage configuration
3555+ /// Auto-detects type based on storage.type field:
3556+ /// - "fs": Local filesystem spill
3557+ /// - "s3": S3 remote spill
3558+ /// - "azblob": Azure blob remote spill
3559+ /// - etc.
3560+ /// If not configured, uses main data storage with _spill prefix (default)
3561+ #[ clap( skip) ]
3562+ pub storage : Option < StorageConfig > ,
3563+
3564+ // Legacy fields for backward compatibility
35363565 /// Path of spill to local disk. disable if it's empty.
35373566 #[ clap( long, value_name = "VALUE" , default_value = "" ) ]
35383567 pub spill_local_disk_path : String ,
@@ -3544,15 +3573,47 @@ pub struct SpillConfig {
35443573 #[ clap( long, value_name = "VALUE" , default_value = "18446744073709551615" ) ]
35453574 /// Allow space in bytes to spill to local disk.
35463575 pub spill_local_disk_max_bytes : u64 ,
3576+ }
35473577
3548- // TODO: We need to fix StorageConfig so that it supports command line injections.
3549- #[ clap( skip) ]
3550- pub storage : Option < StorageConfig > ,
3578+ impl SpillConfig {
3579+ /// Get the spill type based on configuration
3580+ /// Auto-detects from storage configuration
3581+ pub fn get_spill_type ( & self ) -> String {
3582+ // Check new unified storage configuration first (higher priority)
3583+ if let Some ( ref storage) = self . storage {
3584+ match storage. typ . as_str ( ) {
3585+ "fs" => "fs" . to_string ( ) ,
3586+ "s3" => "s3" . to_string ( ) ,
3587+ "azblob" => "azblob" . to_string ( ) ,
3588+ "gcs" => "gcs" . to_string ( ) ,
3589+ "oss" => "oss" . to_string ( ) ,
3590+ "obs" => "obs" . to_string ( ) ,
3591+ "cos" => "cos" . to_string ( ) ,
3592+ "webhdfs" => "webhdfs" . to_string ( ) ,
3593+ _ => "remote" . to_string ( ) , // fallback for other storage types
3594+ }
3595+ } else if !self . spill_local_disk_path . is_empty ( ) {
3596+ // Fall back to legacy configuration
3597+ "fs" . to_string ( ) // legacy local disk maps to fs
3598+ } else {
3599+ "default" . to_string ( )
3600+ }
3601+ }
3602+
3603+ /// Check if this is using legacy local disk configuration
3604+ pub fn is_legacy_local_disk ( & self ) -> bool {
3605+ !self . spill_local_disk_path . is_empty ( )
3606+ }
35513607}
35523608
35533609impl Default for SpillConfig {
35543610 fn default ( ) -> Self {
3555- inner:: SpillConfig :: default ( ) . into ( )
3611+ Self {
3612+ storage : None ,
3613+ spill_local_disk_path : String :: new ( ) ,
3614+ spill_local_disk_reserved_space_percentage : OrderedFloat ( 30.0 ) ,
3615+ spill_local_disk_max_bytes : u64:: MAX ,
3616+ }
35563617 }
35573618}
35583619
@@ -3727,28 +3788,96 @@ mod cache_config_converters {
37273788 spill : SpillConfig ,
37283789 cache : & DiskCacheConfig ,
37293790 ) -> Result < inner:: SpillConfig > {
3730- // Trick for cloud, perhaps we should introduce a new configuration for the local writeable root.
3731- let local_writeable_root = if cache. path != DiskCacheConfig :: default ( ) . path
3732- && spill. spill_local_disk_path . is_empty ( )
3733- {
3734- Some ( cache. path . clone ( ) )
3735- } else {
3736- None
3737- } ;
3738-
3739- let storage_params = spill
3740- . storage
3741- . map ( |storage| {
3742- let storage: InnerStorageConfig = storage. try_into ( ) ?;
3743- Ok :: < _ , ErrorCode > ( storage. params )
3744- } )
3745- . transpose ( ) ?;
3791+ // Determine configuration based on auto-detected spill type
3792+ let spill_type = spill. get_spill_type ( ) ;
3793+ let ( local_writeable_root, path, reserved_disk_ratio, global_bytes_limit, storage_params) =
3794+ match spill_type. as_str ( ) {
3795+ "fs" => {
3796+ if let Some ( ref storage) = spill. storage {
3797+ // Use new filesystem storage configuration (higher priority)
3798+ let fs_path = storage. fs . data_path . clone ( ) ;
3799+ let reserved_ratio = storage
3800+ . fs
3801+ . reserved_space_percentage
3802+ . unwrap_or ( OrderedFloat ( 30.0 ) )
3803+ / 100.0 ;
3804+ let max_bytes = storage. fs . max_bytes . unwrap_or ( u64:: MAX ) ;
3805+
3806+ // Validate fs configuration
3807+ if fs_path. is_empty ( ) {
3808+ return Err ( ErrorCode :: InvalidConfig (
3809+ "FS storage configured but data_path is empty. Either specify a path or remove the fs storage configuration to use default behavior."
3810+ ) ) ;
3811+ } else {
3812+ ( None , fs_path, reserved_ratio, max_bytes, None )
3813+ }
3814+ } else if spill. is_legacy_local_disk ( ) {
3815+ // Fall back to legacy local disk configuration
3816+ (
3817+ None ,
3818+ spill. spill_local_disk_path ,
3819+ spill. spill_local_disk_reserved_space_percentage / 100.0 ,
3820+ spill. spill_local_disk_max_bytes ,
3821+ None ,
3822+ )
3823+ } else {
3824+ return Err ( ErrorCode :: InvalidConfig (
3825+ "FS storage configuration not found" ,
3826+ ) ) ;
3827+ }
3828+ }
3829+ "s3" | "azblob" | "gcs" | "oss" | "obs" | "cos" | "webhdfs" | "remote" => {
3830+ // Use remote storage configuration for all remote storage types
3831+ let storage_params = spill
3832+ . storage
3833+ . map ( |storage| {
3834+ let storage: InnerStorageConfig = storage. try_into ( ) ?;
3835+ Ok :: < _ , ErrorCode > ( storage. params )
3836+ } )
3837+ . transpose ( ) ?;
3838+
3839+ (
3840+ None ,
3841+ String :: new ( ) ,
3842+ OrderedFloat ( 0.3 ) ,
3843+ u64:: MAX ,
3844+ storage_params,
3845+ )
3846+ }
3847+ _ => {
3848+ // Default behavior for "default" type and any unrecognized types
3849+ // Default behavior with backward compatibility
3850+ let local_writeable_root = if cache. path != DiskCacheConfig :: default ( ) . path
3851+ && spill. spill_local_disk_path . is_empty ( )
3852+ {
3853+ Some ( cache. path . clone ( ) )
3854+ } else {
3855+ None
3856+ } ;
3857+
3858+ let storage_params = spill
3859+ . storage
3860+ . map ( |storage| {
3861+ let storage: InnerStorageConfig = storage. try_into ( ) ?;
3862+ Ok :: < _ , ErrorCode > ( storage. params )
3863+ } )
3864+ . transpose ( ) ?;
3865+
3866+ (
3867+ local_writeable_root,
3868+ spill. spill_local_disk_path ,
3869+ spill. spill_local_disk_reserved_space_percentage / 100.0 ,
3870+ spill. spill_local_disk_max_bytes ,
3871+ storage_params,
3872+ )
3873+ }
3874+ } ;
37463875
37473876 Ok ( inner:: SpillConfig {
37483877 local_writeable_root,
3749- path : spill . spill_local_disk_path ,
3750- reserved_disk_ratio : spill . spill_local_disk_reserved_space_percentage / 100.0 ,
3751- global_bytes_limit : spill . spill_local_disk_max_bytes ,
3878+ path,
3879+ reserved_disk_ratio,
3880+ global_bytes_limit,
37523881 storage_params,
37533882 } )
37543883 }
@@ -3764,10 +3893,10 @@ mod cache_config_converters {
37643893 } ) ;
37653894
37663895 Self {
3896+ storage,
37673897 spill_local_disk_path : value. path ,
37683898 spill_local_disk_reserved_space_percentage : value. reserved_disk_ratio * 100.0 ,
37693899 spill_local_disk_max_bytes : value. global_bytes_limit ,
3770- storage,
37713900 }
37723901 }
37733902 }
0 commit comments