Skip to content
This repository was archived by the owner on Jan 30, 2026. It is now read-only.

Commit 04789e4

Browse files
cleanup
1 parent 486aa41 commit 04789e4

File tree

3 files changed

+46
-26
lines changed

3 files changed

+46
-26
lines changed

src/account.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use s2::{
55
client::Client,
66
types::{
77
AccessTokenId, AccessTokenInfo, BasinConfig, BasinInfo, BasinName, CreateBasinRequest,
8-
DeleteBasinRequest, ListAccessTokensRequest, ListAccessTokensResponse, ListBasinsRequest,
9-
ListBasinsResponse, Operation, PermittedOperationGroups, ReconfigureBasinRequest,
10-
ResourceSet, StreamConfig,
8+
DeleteBasinRequest, DeleteOnEmpty, ListAccessTokensRequest, ListAccessTokensResponse,
9+
ListBasinsRequest, ListBasinsResponse, Operation, PermittedOperationGroups,
10+
ReconfigureBasinRequest, ResourceSet, StreamConfig,
1111
},
1212
};
1313

@@ -77,21 +77,30 @@ impl AccountService {
7777
pub async fn create_basin(
7878
&self,
7979
basin: BasinName,
80-
storage_class: Option<crate::types::StorageClass>,
81-
retention_policy: Option<crate::types::RetentionPolicy>,
80+
configured_stream_config: crate::types::StreamConfig,
8281
create_stream_on_append: bool,
8382
create_stream_on_read: bool,
8483
) -> Result<BasinInfo, ServiceError> {
8584
let mut stream_config = StreamConfig::new();
8685

87-
if let Some(storage_class) = storage_class {
86+
if let Some(storage_class) = configured_stream_config.storage_class {
8887
stream_config = stream_config.with_storage_class(storage_class.into());
8988
}
9089

91-
if let Some(retention_policy) = retention_policy {
90+
if let Some(retention_policy) = configured_stream_config.retention_policy {
9291
stream_config = stream_config.with_retention_policy(retention_policy.into());
9392
}
9493

94+
if let Some(timestamping) = configured_stream_config.timestamping {
95+
stream_config = stream_config.with_timestamping(timestamping.into());
96+
}
97+
98+
if let Some(delete_on_empty_min_age) = configured_stream_config.delete_on_empty_min_age {
99+
stream_config = stream_config.with_delete_on_empty(DeleteOnEmpty {
100+
min_age: *delete_on_empty_min_age,
101+
});
102+
}
103+
95104
let basin_config = BasinConfig {
96105
default_stream_config: Some(stream_config),
97106
create_stream_on_append,

src/main.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use tokio_stream::{
3838
use tracing::trace;
3939
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
4040
use types::{
41-
AccessTokenInfo, BasinConfig, DeleteOnEmpty, Operation, PermittedOperationGroups, ResourceSet,
41+
AccessTokenInfo, BasinConfig, Operation, PermittedOperationGroups, ResourceSet,
4242
RetentionPolicy, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri, StorageClass,
4343
StreamConfig, TimestampingConfig, TimestampingMode,
4444
};
@@ -187,7 +187,7 @@ enum Commands {
187187

188188
/// Delete-on-empty configuration for the default stream config.
189189
#[arg(long)]
190-
delete_on_empty_min_age: Option<DeleteOnEmpty>,
190+
delete_on_empty_min_age: Option<humantime::Duration>,
191191
},
192192

193193
/// Issue an access token.
@@ -329,7 +329,7 @@ enum Commands {
329329

330330
/// Delete-on-empty configuration for the stream.
331331
#[arg(long)]
332-
delete_on_empty_min_age: Option<DeleteOnEmpty>,
332+
delete_on_empty_min_age: Option<humantime::Duration>,
333333
},
334334

335335
/// Get the next sequence number that will be assigned by a stream.
@@ -653,7 +653,7 @@ fn build_basin_reconfig(
653653
timestamping_uncapped: Option<&bool>,
654654
create_stream_on_append: Option<&bool>,
655655
create_stream_on_read: Option<&bool>,
656-
delete_on_empty_min_age: Option<&DeleteOnEmpty>,
656+
delete_on_empty_min_age: Option<&humantime::Duration>,
657657
) -> (Option<StreamConfig>, Vec<String>) {
658658
let mut mask = Vec::new();
659659
let has_stream_args = storage_class.is_some()
@@ -712,7 +712,7 @@ fn build_stream_reconfig(
712712
retention_policy: Option<&RetentionPolicy>,
713713
timestamping_mode: Option<&TimestampingMode>,
714714
timestamping_uncapped: Option<&bool>,
715-
delete_on_empty_min_age: Option<&DeleteOnEmpty>,
715+
delete_on_empty_min_age: Option<&humantime::Duration>,
716716
) -> (StreamConfig, Vec<String>) {
717717
let mut mask = Vec::new();
718718

@@ -965,19 +965,11 @@ async fn run() -> Result<(), S2CliError> {
965965
let cfg = config::load_config(&config_path)?;
966966
let client_config = client_config(cfg.access_token)?;
967967
let account_service = AccountService::new(Client::new(client_config));
968-
let (storage_class, retention_policy) = match &config.default_stream_config {
969-
Some(config) => {
970-
let storage_class = config.storage_class.clone();
971-
let retention_policy = config.retention_policy.clone();
972-
(storage_class, retention_policy)
973-
}
974-
None => (None, None),
975-
};
968+
976969
let BasinInfo { state, .. } = account_service
977970
.create_basin(
978971
basin.into(),
979-
storage_class,
980-
retention_policy,
972+
config.default_stream_config.unwrap_or_default(),
981973
config.create_stream_on_append.unwrap_or_default(),
982974
config.create_stream_on_read.unwrap_or_default(),
983975
)

src/types.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub struct BasinConfig {
125125
pub create_stream_on_read: Option<bool>,
126126
}
127127

128-
#[derive(Parser, Debug, Clone, Serialize)]
128+
#[derive(Parser, Debug, Default, Clone, Serialize)]
129129
pub struct StreamConfig {
130130
#[arg(long)]
131131
/// Storage class for a stream.
@@ -138,7 +138,11 @@ pub struct StreamConfig {
138138
pub timestamping: Option<TimestampingConfig>,
139139
#[arg(long, help("Example: 1d, 1w, 1y"))]
140140
/// Delete-on-empty configuration.
141-
pub delete_on_empty_min_age: Option<DeleteOnEmpty>,
141+
#[serde(
142+
skip_serializing_if = "Option::is_none",
143+
serialize_with = "serialize_humantime_duration"
144+
)]
145+
pub delete_on_empty_min_age: Option<humantime::Duration>,
142146
}
143147

144148
#[derive(ValueEnum, Debug, Clone, Serialize)]
@@ -238,7 +242,7 @@ impl From<StreamConfig> for s2::types::StreamConfig {
238242

239243
let delete_on_empty = config
240244
.delete_on_empty_min_age
241-
.map(s2::types::DeleteOnEmpty::from);
245+
.map(|age| s2::types::DeleteOnEmpty { min_age: *age });
242246

243247
let mut stream_config = s2::types::StreamConfig::new();
244248
if let Some(storage_class) = storage_class {
@@ -345,7 +349,9 @@ impl From<s2::types::StreamConfig> for StreamConfig {
345349
storage_class: config.storage_class.map(Into::into),
346350
retention_policy: config.retention_policy.map(Into::into),
347351
timestamping: config.timestamping.map(Into::into),
348-
delete_on_empty_min_age: config.delete_on_empty.map(Into::into),
352+
delete_on_empty_min_age: config
353+
.delete_on_empty
354+
.map(|age| humantime::Duration::from(age.min_age)),
349355
}
350356
}
351357
}
@@ -716,6 +722,19 @@ impl FromStr for Operation {
716722
}
717723
}
718724

725+
fn serialize_humantime_duration<S>(
726+
duration: &Option<humantime::Duration>,
727+
serializer: S,
728+
) -> Result<S::Ok, S::Error>
729+
where
730+
S: serde::Serializer,
731+
{
732+
match duration {
733+
Some(d) => serializer.serialize_str(&format!("{d}")),
734+
None => serializer.serialize_none(),
735+
}
736+
}
737+
719738
#[cfg(test)]
720739
mod tests {
721740
use crate::{error::S2UriParseError, types::S2BasinAndStreamUri};

0 commit comments

Comments
 (0)