Skip to content

Commit 9e587d8

Browse files
committed
feat: use quantities instead of strings
1 parent af39e6b commit 9e587d8

File tree

2 files changed

+58
-27
lines changed

2 files changed

+58
-27
lines changed

rust/operator-binary/src/crd/fault_tolerant_execution.rs

Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use stackable_operator::{
1515
commons::tls_verification::{CaCert, TlsServerVerification, TlsVerification},
1616
crd::s3,
1717
k8s_openapi::api::core::v1::{Volume, VolumeMount},
18+
k8s_openapi::apimachinery::pkg::api::resource::Quantity,
1819
schemars::{self, JsonSchema},
1920
time::Duration,
2021
};
@@ -56,7 +57,7 @@ pub struct QueryRetryConfig {
5657

5758
/// Data size of the coordinator's in-memory buffer used to store output of query stages.
5859
#[serde(skip_serializing_if = "Option::is_none")]
59-
pub exchange_deduplication_buffer_size: Option<String>,
60+
pub exchange_deduplication_buffer_size: Option<Quantity>,
6061

6162
/// Exchange manager configuration for spooling intermediate data during fault tolerant execution.
6263
/// Optional for Query retry policy, recommended for large result sets.
@@ -85,7 +86,7 @@ pub struct TaskRetryConfig {
8586

8687
/// Data size of the coordinator's in-memory buffer used to store output of query stages.
8788
#[serde(skip_serializing_if = "Option::is_none")]
88-
pub exchange_deduplication_buffer_size: Option<String>,
89+
pub exchange_deduplication_buffer_size: Option<Quantity>,
8990

9091
/// Exchange manager configuration for spooling intermediate data during fault tolerant execution.
9192
/// Required for Task retry policy.
@@ -111,7 +112,7 @@ pub struct ExchangeManagerConfig {
111112

112113
/// Max data size of files written by exchange sinks.
113114
#[serde(skip_serializing_if = "Option::is_none")]
114-
pub sink_max_file_size: Option<String>,
115+
pub sink_max_file_size: Option<Quantity>,
115116

116117
/// Number of concurrent readers to read from spooling storage. The larger the number of
117118
/// concurrent readers, the larger the read parallelism and memory usage.
@@ -140,7 +141,7 @@ pub enum ExchangeManagerBackend {
140141
Local(LocalExchangeConfig),
141142
}
142143

143-
#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
144+
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)]
144145
#[serde(rename_all = "camelCase")]
145146
pub struct S3ExchangeConfig {
146147
/// S3 bucket URIs for spooling data (e.g., s3://bucket1,s3://bucket2).
@@ -164,10 +165,10 @@ pub struct S3ExchangeConfig {
164165

165166
/// Part data size for S3 multi-part upload.
166167
#[serde(skip_serializing_if = "Option::is_none")]
167-
pub upload_part_size: Option<String>,
168+
pub upload_part_size: Option<Quantity>,
168169
}
169170

170-
#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
171+
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)]
171172
#[serde(rename_all = "camelCase")]
172173
pub struct HdfsExchangeConfig {
173174
/// HDFS URIs for spooling data.
@@ -178,7 +179,7 @@ pub struct HdfsExchangeConfig {
178179

179180
/// Block data size for HDFS storage.
180181
#[serde(skip_serializing_if = "Option::is_none")]
181-
pub block_size: Option<String>,
182+
pub block_size: Option<Quantity>,
182183

183184
/// Skip directory scheme validation to support Hadoop-compatible file systems.
184185
#[serde(skip_serializing_if = "Option::is_none")]
@@ -201,6 +202,12 @@ pub enum Error {
201202

202203
#[snafu(display("trino does not support disabling the TLS verification of S3 servers"))]
203204
S3TlsNoVerificationNotSupported,
205+
206+
#[snafu(display("failed to convert data size for [{field}] to bytes"))]
207+
QuantityConversion {
208+
source: stackable_operator::memory::Error,
209+
field: &'static str,
210+
},
204211
}
205212

206213
/// Fault tolerant execution configuration with external resources resolved
@@ -237,6 +244,21 @@ impl ResolvedFaultTolerantExecutionConfig {
237244
}
238245
}
239246

247+
/// Helper function to insert optional Quantity values after converting to Trino bytes string
248+
fn insert_quantity_if_present(
249+
properties: &mut BTreeMap<String, String>,
250+
key: &'static str,
251+
quantity: Option<&Quantity>,
252+
) -> Result<(), Error> {
253+
if let Some(q) = quantity {
254+
use snafu::ResultExt;
255+
let v = crate::crd::quantity_to_trino_bytes(q)
256+
.context(QuantityConversionSnafu { field: key })?;
257+
properties.insert(key.to_string(), v);
258+
}
259+
Ok(())
260+
}
261+
240262
/// Create a resolved fault tolerant execution configuration from the cluster config
241263
pub async fn from_config(
242264
config: &FaultTolerantExecutionConfig,
@@ -275,11 +297,11 @@ impl ResolvedFaultTolerantExecutionConfig {
275297
"retry-delay-scale-factor",
276298
query_config.retry_delay_scale_factor.as_ref(),
277299
);
278-
Self::insert_if_present(
300+
Self::insert_quantity_if_present(
279301
&mut config_properties,
280302
"exchange.deduplication-buffer-size",
281303
query_config.exchange_deduplication_buffer_size.as_ref(),
282-
);
304+
)?;
283305

284306
("QUERY", query_config.exchange_manager.as_ref())
285307
}
@@ -311,11 +333,11 @@ impl ResolvedFaultTolerantExecutionConfig {
311333
"retry-delay-scale-factor",
312334
task_config.retry_delay_scale_factor.as_ref(),
313335
);
314-
Self::insert_if_present(
336+
Self::insert_quantity_if_present(
315337
&mut config_properties,
316338
"exchange.deduplication-buffer-size",
317339
task_config.exchange_deduplication_buffer_size.as_ref(),
318-
);
340+
)?;
319341

320342
("TASK", Some(&task_config.exchange_manager))
321343
}
@@ -340,11 +362,11 @@ impl ResolvedFaultTolerantExecutionConfig {
340362
"exchange.sink-buffers-per-partition",
341363
exchange_config.sink_buffers_per_partition,
342364
);
343-
Self::insert_if_present(
365+
Self::insert_quantity_if_present(
344366
&mut exchange_manager_properties,
345367
"exchange.sink-max-file-size",
346368
exchange_config.sink_max_file_size.as_ref(),
347-
);
369+
)?;
348370
Self::insert_if_present(
349371
&mut exchange_manager_properties,
350372
"exchange.source-concurrent-readers",
@@ -378,11 +400,11 @@ impl ResolvedFaultTolerantExecutionConfig {
378400
"exchange.s3.max-error-retries",
379401
s3_config.max_error_retries,
380402
);
381-
Self::insert_if_present(
403+
Self::insert_quantity_if_present(
382404
&mut exchange_manager_properties,
383405
"exchange.s3.upload.part-size",
384406
s3_config.upload_part_size.as_ref(),
385-
);
407+
)?;
386408
}
387409
ExchangeManagerBackend::Hdfs(hdfs_config) => {
388410
exchange_manager_properties
@@ -392,11 +414,11 @@ impl ResolvedFaultTolerantExecutionConfig {
392414
hdfs_config.base_directories.join(","),
393415
);
394416

395-
Self::insert_if_present(
417+
Self::insert_quantity_if_present(
396418
&mut exchange_manager_properties,
397419
"exchange.hdfs.block-size",
398420
hdfs_config.block_size.as_ref(),
399-
);
421+
)?;
400422
Self::insert_if_present(
401423
&mut exchange_manager_properties,
402424
"exchange.hdfs.skip-directory-scheme-validation",
@@ -562,7 +584,7 @@ mod tests {
562584
retry_initial_delay: Some(Duration::from_secs(15)),
563585
retry_max_delay: Some(Duration::from_secs(90)),
564586
retry_delay_scale_factor: Some(3.0),
565-
exchange_deduplication_buffer_size: Some("64MB".to_string()),
587+
exchange_deduplication_buffer_size: Some(Quantity("64Mi".to_string())),
566588
exchange_manager: None,
567589
});
568590

@@ -595,7 +617,7 @@ mod tests {
595617
fte_config
596618
.config_properties
597619
.get("exchange.deduplication-buffer-size"),
598-
Some(&"64MB".to_string())
620+
Some(&"67108864B".to_string())
599621
);
600622
}
601623

@@ -606,12 +628,12 @@ mod tests {
606628
retry_initial_delay: Some(Duration::from_secs(10)),
607629
retry_max_delay: Some(Duration::from_secs(60)),
608630
retry_delay_scale_factor: Some(2.0),
609-
exchange_deduplication_buffer_size: Some("100MB".to_string()),
631+
exchange_deduplication_buffer_size: Some(Quantity("100Mi".to_string())),
610632
exchange_manager: Some(ExchangeManagerConfig {
611633
encryption_enabled: Some(true),
612634
sink_buffer_pool_min_size: Some(10),
613635
sink_buffers_per_partition: Some(2),
614-
sink_max_file_size: Some("1GB".to_string()),
636+
sink_max_file_size: Some(Quantity("1Gi".to_string())),
615637
source_concurrent_readers: Some(4),
616638
backend: ExchangeManagerBackend::Local(LocalExchangeConfig {
617639
base_directories: vec!["/tmp/exchange".to_string()],
@@ -662,7 +684,7 @@ mod tests {
662684
fte_config
663685
.config_properties
664686
.get("exchange.deduplication-buffer-size"),
665-
Some(&"100MB".to_string())
687+
Some(&"104857600B".to_string())
666688
);
667689
assert_eq!(
668690
fte_config
@@ -684,7 +706,7 @@ mod tests {
684706
encryption_enabled: None,
685707
sink_buffer_pool_min_size: Some(20),
686708
sink_buffers_per_partition: Some(4),
687-
sink_max_file_size: Some("2GB".to_string()),
709+
sink_max_file_size: Some(Quantity("2Gi".to_string())),
688710
source_concurrent_readers: Some(8),
689711
backend: ExchangeManagerBackend::S3(S3ExchangeConfig {
690712
base_directories: vec!["s3://my-bucket/exchange".to_string()],
@@ -694,7 +716,7 @@ mod tests {
694716
iam_role: Some("arn:aws:iam::123456789012:role/TrinoRole".to_string()),
695717
external_id: Some("external-id-123".to_string()),
696718
max_error_retries: Some(5),
697-
upload_part_size: Some("10MB".to_string()),
719+
upload_part_size: Some(Quantity("10Mi".to_string())),
698720
}),
699721
config_overrides: std::collections::HashMap::new(),
700722
},
@@ -751,7 +773,7 @@ mod tests {
751773
fte_config
752774
.exchange_manager_properties
753775
.get("exchange.s3.upload.part-size"),
754-
Some(&"10MB".to_string())
776+
Some(&"10485760B".to_string())
755777
);
756778
assert_eq!(
757779
fte_config
@@ -769,7 +791,7 @@ mod tests {
769791
fte_config
770792
.exchange_manager_properties
771793
.get("exchange.sink-max-file-size"),
772-
Some(&"2GB".to_string())
794+
Some(&"2147483648B".to_string())
773795
);
774796
assert_eq!(
775797
fte_config
@@ -808,7 +830,7 @@ mod tests {
808830
iam_role: None,
809831
external_id: None,
810832
max_error_retries: None,
811-
upload_part_size: Some("original-value".to_string()),
833+
upload_part_size: Some(Quantity("10Mi".to_string())),
812834
}),
813835
config_overrides,
814836
},

rust/operator-binary/src/crd/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,15 @@ pub const WORKER_SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(30);
137137
/// Safety puffer to guarantee the graceful shutdown works every time.
138138
pub const WORKER_GRACEFUL_SHUTDOWN_SAFETY_OVERHEAD: Duration = Duration::from_secs(10);
139139

140+
/// Convert a Kubernetes `Quantity` to a Trino property string in bytes, e.g. `"65536B"`.
141+
pub(crate) fn quantity_to_trino_bytes(
142+
q: &Quantity,
143+
) -> Result<String, stackable_operator::memory::Error> {
144+
let in_mebi = MemoryQuantity::try_from(q)?.scale_to(BinaryMultiple::Mebi);
145+
let bytes = (in_mebi.value * 1024.0 * 1024.0).round() as u64;
146+
Ok(format!("{bytes}B"))
147+
}
148+
140149
#[derive(Snafu, Debug)]
141150
pub enum Error {
142151
#[snafu(display("object has no namespace associated"))]

0 commit comments

Comments
 (0)