@@ -15,6 +15,7 @@ use stackable_operator::{
1515 commons:: tls_verification:: { CaCert , TlsServerVerification , TlsVerification } ,
1616 crd:: s3,
1717 k8s_openapi:: api:: core:: v1:: { Volume , VolumeMount } ,
18+ k8s_openapi:: apimachinery:: pkg:: api:: resource:: Quantity ,
1819 schemars:: { self , JsonSchema } ,
1920 time:: Duration ,
2021} ;
@@ -56,7 +57,7 @@ pub struct QueryRetryConfig {
5657
5758 /// Data size of the coordinator's in-memory buffer used to store output of query stages.
5859 #[ serde( skip_serializing_if = "Option::is_none" ) ]
59- pub exchange_deduplication_buffer_size : Option < String > ,
60+ pub exchange_deduplication_buffer_size : Option < Quantity > ,
6061
6162 /// Exchange manager configuration for spooling intermediate data during fault tolerant execution.
6263 /// Optional for Query retry policy, recommended for large result sets.
@@ -85,7 +86,7 @@ pub struct TaskRetryConfig {
8586
8687 /// Data size of the coordinator's in-memory buffer used to store output of query stages.
8788 #[ serde( skip_serializing_if = "Option::is_none" ) ]
88- pub exchange_deduplication_buffer_size : Option < String > ,
89+ pub exchange_deduplication_buffer_size : Option < Quantity > ,
8990
9091 /// Exchange manager configuration for spooling intermediate data during fault tolerant execution.
9192 /// Required for Task retry policy.
@@ -111,7 +112,7 @@ pub struct ExchangeManagerConfig {
111112
112113 /// Max data size of files written by exchange sinks.
113114 #[ serde( skip_serializing_if = "Option::is_none" ) ]
114- pub sink_max_file_size : Option < String > ,
115+ pub sink_max_file_size : Option < Quantity > ,
115116
116117 /// Number of concurrent readers to read from spooling storage. The larger the number of
117118 /// concurrent readers, the larger the read parallelism and memory usage.
@@ -140,7 +141,7 @@ pub enum ExchangeManagerBackend {
140141 Local ( LocalExchangeConfig ) ,
141142}
142143
143- #[ derive( Clone , Debug , Deserialize , Eq , JsonSchema , PartialEq , Serialize ) ]
144+ #[ derive( Clone , Debug , Deserialize , JsonSchema , PartialEq , Serialize ) ]
144145#[ serde( rename_all = "camelCase" ) ]
145146pub struct S3ExchangeConfig {
146147 /// S3 bucket URIs for spooling data (e.g., s3://bucket1,s3://bucket2).
@@ -164,10 +165,10 @@ pub struct S3ExchangeConfig {
164165
165166 /// Part data size for S3 multi-part upload.
166167 #[ serde( skip_serializing_if = "Option::is_none" ) ]
167- pub upload_part_size : Option < String > ,
168+ pub upload_part_size : Option < Quantity > ,
168169}
169170
170- #[ derive( Clone , Debug , Deserialize , Eq , JsonSchema , PartialEq , Serialize ) ]
171+ #[ derive( Clone , Debug , Deserialize , JsonSchema , PartialEq , Serialize ) ]
171172#[ serde( rename_all = "camelCase" ) ]
172173pub struct HdfsExchangeConfig {
173174 /// HDFS URIs for spooling data.
@@ -178,7 +179,7 @@ pub struct HdfsExchangeConfig {
178179
179180 /// Block data size for HDFS storage.
180181 #[ serde( skip_serializing_if = "Option::is_none" ) ]
181- pub block_size : Option < String > ,
182+ pub block_size : Option < Quantity > ,
182183
183184 /// Skip directory scheme validation to support Hadoop-compatible file systems.
184185 #[ serde( skip_serializing_if = "Option::is_none" ) ]
@@ -201,6 +202,12 @@ pub enum Error {
201202
202203 #[ snafu( display( "trino does not support disabling the TLS verification of S3 servers" ) ) ]
203204 S3TlsNoVerificationNotSupported ,
205+
206+ #[ snafu( display( "failed to convert data size for [{field}] to bytes" ) ) ]
207+ QuantityConversion {
208+ source : stackable_operator:: memory:: Error ,
209+ field : & ' static str ,
210+ } ,
204211}
205212
206213/// Fault tolerant execution configuration with external resources resolved
@@ -237,6 +244,21 @@ impl ResolvedFaultTolerantExecutionConfig {
237244 }
238245 }
239246
247+ /// Helper function to insert optional Quantity values after converting to Trino bytes string
248+ fn insert_quantity_if_present (
249+ properties : & mut BTreeMap < String , String > ,
250+ key : & ' static str ,
251+ quantity : Option < & Quantity > ,
252+ ) -> Result < ( ) , Error > {
253+ if let Some ( q) = quantity {
254+ use snafu:: ResultExt ;
255+ let v = crate :: crd:: quantity_to_trino_bytes ( q)
256+ . context ( QuantityConversionSnafu { field : key } ) ?;
257+ properties. insert ( key. to_string ( ) , v) ;
258+ }
259+ Ok ( ( ) )
260+ }
261+
240262 /// Create a resolved fault tolerant execution configuration from the cluster config
241263 pub async fn from_config (
242264 config : & FaultTolerantExecutionConfig ,
@@ -275,11 +297,11 @@ impl ResolvedFaultTolerantExecutionConfig {
275297 "retry-delay-scale-factor" ,
276298 query_config. retry_delay_scale_factor . as_ref ( ) ,
277299 ) ;
278- Self :: insert_if_present (
300+ Self :: insert_quantity_if_present (
279301 & mut config_properties,
280302 "exchange.deduplication-buffer-size" ,
281303 query_config. exchange_deduplication_buffer_size . as_ref ( ) ,
282- ) ;
304+ ) ? ;
283305
284306 ( "QUERY" , query_config. exchange_manager . as_ref ( ) )
285307 }
@@ -311,11 +333,11 @@ impl ResolvedFaultTolerantExecutionConfig {
311333 "retry-delay-scale-factor" ,
312334 task_config. retry_delay_scale_factor . as_ref ( ) ,
313335 ) ;
314- Self :: insert_if_present (
336+ Self :: insert_quantity_if_present (
315337 & mut config_properties,
316338 "exchange.deduplication-buffer-size" ,
317339 task_config. exchange_deduplication_buffer_size . as_ref ( ) ,
318- ) ;
340+ ) ? ;
319341
320342 ( "TASK" , Some ( & task_config. exchange_manager ) )
321343 }
@@ -340,11 +362,11 @@ impl ResolvedFaultTolerantExecutionConfig {
340362 "exchange.sink-buffers-per-partition" ,
341363 exchange_config. sink_buffers_per_partition ,
342364 ) ;
343- Self :: insert_if_present (
365+ Self :: insert_quantity_if_present (
344366 & mut exchange_manager_properties,
345367 "exchange.sink-max-file-size" ,
346368 exchange_config. sink_max_file_size . as_ref ( ) ,
347- ) ;
369+ ) ? ;
348370 Self :: insert_if_present (
349371 & mut exchange_manager_properties,
350372 "exchange.source-concurrent-readers" ,
@@ -378,11 +400,11 @@ impl ResolvedFaultTolerantExecutionConfig {
378400 "exchange.s3.max-error-retries" ,
379401 s3_config. max_error_retries ,
380402 ) ;
381- Self :: insert_if_present (
403+ Self :: insert_quantity_if_present (
382404 & mut exchange_manager_properties,
383405 "exchange.s3.upload.part-size" ,
384406 s3_config. upload_part_size . as_ref ( ) ,
385- ) ;
407+ ) ? ;
386408 }
387409 ExchangeManagerBackend :: Hdfs ( hdfs_config) => {
388410 exchange_manager_properties
@@ -392,11 +414,11 @@ impl ResolvedFaultTolerantExecutionConfig {
392414 hdfs_config. base_directories . join ( "," ) ,
393415 ) ;
394416
395- Self :: insert_if_present (
417+ Self :: insert_quantity_if_present (
396418 & mut exchange_manager_properties,
397419 "exchange.hdfs.block-size" ,
398420 hdfs_config. block_size . as_ref ( ) ,
399- ) ;
421+ ) ? ;
400422 Self :: insert_if_present (
401423 & mut exchange_manager_properties,
402424 "exchange.hdfs.skip-directory-scheme-validation" ,
@@ -562,7 +584,7 @@ mod tests {
562584 retry_initial_delay : Some ( Duration :: from_secs ( 15 ) ) ,
563585 retry_max_delay : Some ( Duration :: from_secs ( 90 ) ) ,
564586 retry_delay_scale_factor : Some ( 3.0 ) ,
565- exchange_deduplication_buffer_size : Some ( "64MB ". to_string ( ) ) ,
587+ exchange_deduplication_buffer_size : Some ( Quantity ( "64Mi ". to_string ( ) ) ) ,
566588 exchange_manager : None ,
567589 } ) ;
568590
@@ -595,7 +617,7 @@ mod tests {
595617 fte_config
596618 . config_properties
597619 . get( "exchange.deduplication-buffer-size" ) ,
598- Some ( & "64MB " . to_string( ) )
620+ Some ( & "67108864B " . to_string( ) )
599621 ) ;
600622 }
601623
@@ -606,12 +628,12 @@ mod tests {
606628 retry_initial_delay : Some ( Duration :: from_secs ( 10 ) ) ,
607629 retry_max_delay : Some ( Duration :: from_secs ( 60 ) ) ,
608630 retry_delay_scale_factor : Some ( 2.0 ) ,
609- exchange_deduplication_buffer_size : Some ( "100MB ". to_string ( ) ) ,
631+ exchange_deduplication_buffer_size : Some ( Quantity ( "100Mi ". to_string ( ) ) ) ,
610632 exchange_manager : Some ( ExchangeManagerConfig {
611633 encryption_enabled : Some ( true ) ,
612634 sink_buffer_pool_min_size : Some ( 10 ) ,
613635 sink_buffers_per_partition : Some ( 2 ) ,
614- sink_max_file_size : Some ( "1GB ". to_string ( ) ) ,
636+ sink_max_file_size : Some ( Quantity ( "1Gi ". to_string ( ) ) ) ,
615637 source_concurrent_readers : Some ( 4 ) ,
616638 backend : ExchangeManagerBackend :: Local ( LocalExchangeConfig {
617639 base_directories : vec ! [ "/tmp/exchange" . to_string( ) ] ,
@@ -662,7 +684,7 @@ mod tests {
662684 fte_config
663685 . config_properties
664686 . get( "exchange.deduplication-buffer-size" ) ,
665- Some ( & "100MB " . to_string( ) )
687+ Some ( & "104857600B " . to_string( ) )
666688 ) ;
667689 assert_eq ! (
668690 fte_config
@@ -684,7 +706,7 @@ mod tests {
684706 encryption_enabled : None ,
685707 sink_buffer_pool_min_size : Some ( 20 ) ,
686708 sink_buffers_per_partition : Some ( 4 ) ,
687- sink_max_file_size : Some ( "2GB ". to_string ( ) ) ,
709+ sink_max_file_size : Some ( Quantity ( "2Gi ". to_string ( ) ) ) ,
688710 source_concurrent_readers : Some ( 8 ) ,
689711 backend : ExchangeManagerBackend :: S3 ( S3ExchangeConfig {
690712 base_directories : vec ! [ "s3://my-bucket/exchange" . to_string( ) ] ,
@@ -694,7 +716,7 @@ mod tests {
694716 iam_role : Some ( "arn:aws:iam::123456789012:role/TrinoRole" . to_string ( ) ) ,
695717 external_id : Some ( "external-id-123" . to_string ( ) ) ,
696718 max_error_retries : Some ( 5 ) ,
697- upload_part_size : Some ( "10MB ". to_string ( ) ) ,
719+ upload_part_size : Some ( Quantity ( "10Mi ". to_string ( ) ) ) ,
698720 } ) ,
699721 config_overrides : std:: collections:: HashMap :: new ( ) ,
700722 } ,
@@ -751,7 +773,7 @@ mod tests {
751773 fte_config
752774 . exchange_manager_properties
753775 . get( "exchange.s3.upload.part-size" ) ,
754- Some ( & "10MB " . to_string( ) )
776+ Some ( & "10485760B " . to_string( ) )
755777 ) ;
756778 assert_eq ! (
757779 fte_config
@@ -769,7 +791,7 @@ mod tests {
769791 fte_config
770792 . exchange_manager_properties
771793 . get( "exchange.sink-max-file-size" ) ,
772- Some ( & "2GB " . to_string( ) )
794+ Some ( & "2147483648B " . to_string( ) )
773795 ) ;
774796 assert_eq ! (
775797 fte_config
@@ -808,7 +830,7 @@ mod tests {
808830 iam_role : None ,
809831 external_id : None ,
810832 max_error_retries : None ,
811- upload_part_size : Some ( "original-value ". to_string ( ) ) ,
833+ upload_part_size : Some ( Quantity ( "10Mi ". to_string ( ) ) ) ,
812834 } ) ,
813835 config_overrides,
814836 } ,
0 commit comments