Skip to content

Commit 3208f2d

Browse files
feat: delete-on-empty (#163)
Co-authored-by: shikhar <shikhar@s2.dev>
1 parent 30bcfcf commit 3208f2d

File tree

6 files changed

+94
-21
lines changed

6 files changed

+94
-21
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ jobs:
3838
toolchain: stable
3939
components: rustfmt, clippy
4040
- name: install protoc
41-
uses: arduino/setup-protoc@v3
41+
uses: arduino/setup-protoc@v3
42+
with:
43+
repo-token: ${{ secrets.GITHUB_TOKEN }}
4244
- uses: Swatinem/rust-cache@v2
4345
- name: Run cargo tests
4446
run: cargo test
@@ -53,7 +55,9 @@ jobs:
5355
toolchain: stable
5456
components: rustfmt, clippy
5557
- name: install protoc
56-
uses: arduino/setup-protoc@v3
58+
uses: arduino/setup-protoc@v3
59+
with:
60+
repo-token: ${{ secrets.GITHUB_TOKEN }}
5761
- uses: Swatinem/rust-cache@v2
5862
- name: Install cargo-sort
5963
uses: baptiste0928/cargo-install@v3

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ miette = { version = "7.6.0", features = ["fancy"] }
3030
rand = "0.9.1"
3131
serde = { version = "1.0.219", features = ["derive"] }
3232
serde_json = "1.0.140"
33-
streamstore = { version = "0.18.0", features = ["connector"] }
33+
streamstore = { version = "0.19.1", features = ["connector"] }
3434
thiserror = "2.0.12"
3535
tokio = { version = "1.45.1", features = ["full"] }
3636
tokio-stream = { version = "0.1.17", features = ["io-util"] }

src/account.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ impl AccountService {
9595
stream_config = stream_config.with_timestamping(timestamping);
9696
}
9797

98+
if let Some(delete_on_empty) = configured_stream_config.delete_on_empty {
99+
stream_config = stream_config.with_delete_on_empty(delete_on_empty);
100+
}
101+
98102
let basin_config = BasinConfig {
99103
default_stream_config: Some(stream_config),
100104
create_stream_on_append,

src/main.rs

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ use tokio_stream::{
3838
use tracing::trace;
3939
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
4040
use types::{
41-
AccessTokenInfo, BasinConfig, Operation, PermittedOperationGroups, ResourceSet,
42-
RetentionPolicy, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri, StorageClass,
43-
StreamConfig, TimestampingConfig, TimestampingMode,
41+
AccessTokenInfo, BasinConfig, DeleteOnEmptyConfig, Operation, PermittedOperationGroups,
42+
ResourceSet, RetentionPolicy, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri,
43+
StorageClass, StreamConfig, TimestampingConfig, TimestampingMode,
4444
};
4545

4646
mod account;
@@ -161,6 +161,14 @@ enum Commands {
161161
/// Name of the basin to reconfigure.
162162
basin: S2BasinUri,
163163

164+
/// Create stream on append with basin defaults if it doesn't exist.
165+
#[arg(long)]
166+
create_stream_on_append: Option<bool>,
167+
168+
/// Create stream on read with basin defaults if it doesn't exist.
169+
#[arg(long)]
170+
create_stream_on_read: Option<bool>,
171+
164172
/// Storage class for the default stream config.
165173
#[arg(long)]
166174
storage_class: Option<StorageClass>,
@@ -177,13 +185,9 @@ enum Commands {
177185
#[arg(long)]
178186
timestamping_uncapped: Option<bool>,
179187

180-
/// Create stream on append with basin defaults if it doesn't exist.
181-
#[arg(long)]
182-
create_stream_on_append: Option<bool>,
183-
184-
/// Create stream on read with basin defaults if it doesn't exist.
188+
/// Delete-on-empty minimum age threshold for the default stream config.
185189
#[arg(long)]
186-
create_stream_on_read: Option<bool>,
190+
delete_on_empty_min_age: Option<humantime::Duration>,
187191
},
188192

189193
/// Issue an access token.
@@ -322,6 +326,10 @@ enum Commands {
322326
/// Uncapped timestamps for the stream.
323327
#[arg(long)]
324328
timestamping_uncapped: Option<bool>,
329+
330+
/// Delete-on-empty minimum age threshold for the stream.
331+
#[arg(long)]
332+
delete_on_empty_min_age: Option<humantime::Duration>,
325333
},
326334

327335
/// Get the next sequence number that will be assigned by a stream.
@@ -645,12 +653,14 @@ fn build_basin_reconfig(
645653
timestamping_uncapped: Option<&bool>,
646654
create_stream_on_append: Option<&bool>,
647655
create_stream_on_read: Option<&bool>,
656+
delete_on_empty_min_age: Option<&humantime::Duration>,
648657
) -> (StreamConfig, Vec<String>) {
649658
let mut mask = Vec::new();
650659
let has_stream_args = storage_class.is_some()
651660
|| retention_policy.is_some()
652661
|| timestamping_mode.is_some()
653-
|| timestamping_uncapped.is_some();
662+
|| timestamping_uncapped.is_some()
663+
|| delete_on_empty_min_age.is_some();
654664

655665
let default_stream_config = if has_stream_args {
656666
let timestamping = if timestamping_mode.is_some() || timestamping_uncapped.is_some() {
@@ -662,15 +672,26 @@ fn build_basin_reconfig(
662672
None
663673
};
664674

675+
let delete_on_empty = delete_on_empty_min_age.map(|d| DeleteOnEmptyConfig {
676+
delete_on_empty_min_age: (*d).into(),
677+
});
678+
665679
StreamConfig {
666680
storage_class: storage_class.cloned(),
667681
retention_policy: retention_policy.cloned(),
668682
timestamping,
683+
delete_on_empty,
669684
}
670685
} else {
671686
Default::default()
672687
};
673688

689+
if create_stream_on_append.is_some() {
690+
mask.push("create_stream_on_append".to_owned());
691+
}
692+
if create_stream_on_read.is_some() {
693+
mask.push("create_stream_on_read".to_owned());
694+
}
674695
if storage_class.is_some() {
675696
mask.push("default_stream_config.storage_class".to_owned());
676697
}
@@ -683,11 +704,8 @@ fn build_basin_reconfig(
683704
if timestamping_uncapped.is_some() {
684705
mask.push("default_stream_config.timestamping.uncapped".to_owned());
685706
}
686-
if create_stream_on_append.is_some() {
687-
mask.push("create_stream_on_append".to_owned());
688-
}
689-
if create_stream_on_read.is_some() {
690-
mask.push("create_stream_on_read".to_owned());
707+
if delete_on_empty_min_age.is_some() {
708+
mask.push("default_stream_config.delete_on_empty.min_age_secs".to_owned());
691709
}
692710

693711
(default_stream_config, mask)
@@ -698,6 +716,7 @@ fn build_stream_reconfig(
698716
retention_policy: Option<&RetentionPolicy>,
699717
timestamping_mode: Option<&TimestampingMode>,
700718
timestamping_uncapped: Option<&bool>,
719+
delete_on_empty_min_age: Option<&humantime::Duration>,
701720
) -> (StreamConfig, Vec<String>) {
702721
let mut mask = Vec::new();
703722

@@ -710,10 +729,15 @@ fn build_stream_reconfig(
710729
None
711730
};
712731

732+
let delete_on_empty = delete_on_empty_min_age.map(|d| DeleteOnEmptyConfig {
733+
delete_on_empty_min_age: (*d).into(),
734+
});
735+
713736
let stream_config = StreamConfig {
714737
storage_class: storage_class.cloned(),
715738
retention_policy: retention_policy.cloned(),
716739
timestamping,
740+
delete_on_empty,
717741
};
718742

719743
if storage_class.is_some() {
@@ -728,6 +752,9 @@ fn build_stream_reconfig(
728752
if timestamping_uncapped.is_some() {
729753
mask.push("timestamping.uncapped".to_string());
730754
}
755+
if delete_on_empty_min_age.is_some() {
756+
mask.push("delete_on_empty.min_age_secs".to_string());
757+
}
731758

732759
(stream_config, mask)
733760
}
@@ -987,6 +1014,7 @@ async fn run() -> Result<(), S2CliError> {
9871014
timestamping_uncapped,
9881015
create_stream_on_append,
9891016
create_stream_on_read,
1017+
delete_on_empty_min_age,
9901018
} => {
9911019
let cfg = config::load_config(&config_path)?;
9921020
let client_config = client_config(cfg.access_token)?;
@@ -999,6 +1027,7 @@ async fn run() -> Result<(), S2CliError> {
9991027
timestamping_uncapped.as_ref(),
10001028
create_stream_on_append.as_ref(),
10011029
create_stream_on_read.as_ref(),
1030+
delete_on_empty_min_age.as_ref(),
10021031
);
10031032

10041033
let basin_config = BasinConfig {
@@ -1070,6 +1099,7 @@ async fn run() -> Result<(), S2CliError> {
10701099
retention_policy,
10711100
timestamping_mode,
10721101
timestamping_uncapped,
1102+
delete_on_empty_min_age,
10731103
} => {
10741104
let S2BasinAndStreamUri { basin, stream } = uri.uri;
10751105
let cfg = config::load_config(&config_path)?;
@@ -1081,6 +1111,7 @@ async fn run() -> Result<(), S2CliError> {
10811111
retention_policy.as_ref(),
10821112
timestamping_mode.as_ref(),
10831113
timestamping_uncapped.as_ref(),
1114+
delete_on_empty_min_age.as_ref(),
10841115
);
10851116

10861117
let config: StreamConfig = BasinService::new(basin_client)

src/types.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Types for Basin configuration that directly map to s2::types.
22
3-
use clap::{Parser, ValueEnum};
3+
use clap::{Args, Parser, ValueEnum};
44
use s2::types::BasinName;
55
use serde::Serialize;
66
use std::{str::FromStr, time::Duration};
@@ -136,6 +136,9 @@ pub struct StreamConfig {
136136
#[clap(flatten)]
137137
/// Timestamping configuration.
138138
pub timestamping: Option<TimestampingConfig>,
139+
#[clap(flatten)]
140+
/// Delete-on-empty configuration.
141+
pub delete_on_empty: Option<DeleteOnEmptyConfig>,
139142
}
140143

141144
#[derive(ValueEnum, Debug, Clone, Serialize)]
@@ -178,6 +181,29 @@ impl From<&str> for RetentionPolicy {
178181
}
179182
}
180183
}
184+
#[derive(Args, Clone, Debug, Serialize)]
185+
pub struct DeleteOnEmptyConfig {
186+
#[arg(long, value_parser = humantime::parse_duration, required = false)]
187+
/// Minimum age before an empty stream can be deleted.
188+
/// Example: 1d, 1w, 1y
189+
pub delete_on_empty_min_age: Duration,
190+
}
191+
192+
impl From<DeleteOnEmptyConfig> for s2::types::DeleteOnEmptyConfig {
193+
fn from(value: DeleteOnEmptyConfig) -> Self {
194+
s2::types::DeleteOnEmptyConfig {
195+
min_age: value.delete_on_empty_min_age,
196+
}
197+
}
198+
}
199+
200+
impl From<s2::types::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
201+
fn from(value: s2::types::DeleteOnEmptyConfig) -> Self {
202+
Self {
203+
delete_on_empty_min_age: value.min_age,
204+
}
205+
}
206+
}
181207

182208
impl From<BasinConfig> for s2::types::BasinConfig {
183209
fn from(config: BasinConfig) -> Self {
@@ -204,6 +230,10 @@ impl From<StreamConfig> for s2::types::StreamConfig {
204230

205231
let timestamping_config = config.timestamping.map(s2::types::TimestampingConfig::from);
206232

233+
let delete_on_empty = config
234+
.delete_on_empty
235+
.map(s2::types::DeleteOnEmptyConfig::from);
236+
207237
let mut stream_config = s2::types::StreamConfig::new();
208238
if let Some(storage_class) = storage_class {
209239
stream_config = stream_config.with_storage_class(storage_class);
@@ -214,6 +244,9 @@ impl From<StreamConfig> for s2::types::StreamConfig {
214244
if let Some(timestamping) = timestamping_config {
215245
stream_config = stream_config.with_timestamping(timestamping);
216246
}
247+
if let Some(delete_on_empty) = delete_on_empty {
248+
stream_config = stream_config.with_delete_on_empty(delete_on_empty);
249+
}
217250
stream_config
218251
}
219252
}
@@ -309,6 +342,7 @@ impl From<s2::types::StreamConfig> for StreamConfig {
309342
storage_class: config.storage_class.map(Into::into),
310343
retention_policy: config.retention_policy.map(Into::into),
311344
timestamping: config.timestamping.map(Into::into),
345+
delete_on_empty: config.delete_on_empty.map(Into::into),
312346
}
313347
}
314348
}

0 commit comments

Comments
 (0)