Skip to content
This repository was archived by the owner on Jan 30, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ jobs:
toolchain: stable
components: rustfmt, clippy
- name: install protoc
uses: arduino/setup-protoc@v3
uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
- name: Run cargo tests
run: cargo test
Expand All @@ -53,7 +55,9 @@ jobs:
toolchain: stable
components: rustfmt, clippy
- name: install protoc
uses: arduino/setup-protoc@v3
uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
- name: Install cargo-sort
uses: baptiste0928/cargo-install@v3
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ miette = { version = "7.6.0", features = ["fancy"] }
rand = "0.9.1"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
streamstore = { version = "0.18.0", features = ["connector"] }
streamstore = { version = "0.19.0", features = ["connector"] }
thiserror = "2.0.12"
tokio = { version = "1.45.1", features = ["full"] }
tokio-stream = { version = "0.1.17", features = ["io-util"] }
Expand Down
14 changes: 13 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tokio_stream::{
use tracing::trace;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
use types::{
AccessTokenInfo, BasinConfig, Operation, PermittedOperationGroups, ResourceSet,
AccessTokenInfo, BasinConfig, DeleteOnEmpty, Operation, PermittedOperationGroups, ResourceSet,
RetentionPolicy, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri, StorageClass,
StreamConfig, TimestampingConfig, TimestampingMode,
};
Expand Down Expand Up @@ -322,6 +322,10 @@ enum Commands {
/// Uncapped timestamps for the stream.
#[arg(long)]
timestamping_uncapped: Option<bool>,

/// Delete-on-empty configuration for the stream.
#[arg(long)]
delete_on_empty: Option<DeleteOnEmpty>,
},

/// Get the next sequence number that will be assigned by a stream.
Expand Down Expand Up @@ -666,6 +670,7 @@ fn build_basin_reconfig(
storage_class: storage_class.cloned(),
retention_policy: retention_policy.cloned(),
timestamping,
delete_on_empty: None,
})
} else {
None
Expand Down Expand Up @@ -698,6 +703,7 @@ fn build_stream_reconfig(
retention_policy: Option<&RetentionPolicy>,
timestamping_mode: Option<&TimestampingMode>,
timestamping_uncapped: Option<&bool>,
delete_on_empty: Option<&DeleteOnEmpty>,
) -> (StreamConfig, Vec<String>) {
let mut mask = Vec::new();

Expand All @@ -714,6 +720,7 @@ fn build_stream_reconfig(
storage_class: storage_class.cloned(),
retention_policy: retention_policy.cloned(),
timestamping,
delete_on_empty: delete_on_empty.cloned(),
};

if storage_class.is_some() {
Expand All @@ -728,6 +735,9 @@ fn build_stream_reconfig(
if timestamping_uncapped.is_some() {
mask.push("timestamping.uncapped".to_string());
}
if delete_on_empty.is_some() {
mask.push("delete_on_empty".to_string());
}

(stream_config, mask)
}
Expand Down Expand Up @@ -1079,6 +1089,7 @@ async fn run() -> Result<(), S2CliError> {
retention_policy,
timestamping_mode,
timestamping_uncapped,
delete_on_empty,
} => {
let S2BasinAndStreamUri { basin, stream } = uri.uri;
let cfg = config::load_config(&config_path)?;
Expand All @@ -1090,6 +1101,7 @@ async fn run() -> Result<(), S2CliError> {
retention_policy.as_ref(),
timestamping_mode.as_ref(),
timestamping_uncapped.as_ref(),
delete_on_empty.as_ref(),
);

let config: StreamConfig = BasinService::new(basin_client)
Expand Down
40 changes: 40 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ pub struct StreamConfig {
#[clap(flatten)]
/// Timestamping configuration.
pub timestamping: Option<TimestampingConfig>,

/// Delete-on-empty configuration.
#[arg(long)]
pub delete_on_empty: Option<DeleteOnEmpty>,
}

#[derive(ValueEnum, Debug, Clone, Serialize)]
Expand Down Expand Up @@ -179,6 +183,36 @@ impl From<&str> for RetentionPolicy {
}
}

#[derive(Clone, Debug, Serialize)]
pub struct DeleteOnEmpty {
pub min_age: Duration,
}

impl FromStr for DeleteOnEmpty {
type Err = humantime::DurationError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let duration = humantime::parse_duration(s)?;
Ok(Self { min_age: duration })
}
}

impl From<DeleteOnEmpty> for s2::types::DeleteOnEmpty {
fn from(value: DeleteOnEmpty) -> Self {
s2::types::DeleteOnEmpty {
min_age: value.min_age,
}
}
}

impl From<s2::types::DeleteOnEmpty> for DeleteOnEmpty {
fn from(value: s2::types::DeleteOnEmpty) -> Self {
Self {
min_age: value.min_age,
}
}
}

impl From<BasinConfig> for s2::types::BasinConfig {
fn from(config: BasinConfig) -> Self {
let BasinConfig {
Expand All @@ -204,6 +238,8 @@ impl From<StreamConfig> for s2::types::StreamConfig {

let timestamping_config = config.timestamping.map(s2::types::TimestampingConfig::from);

let delete_on_empty = config.delete_on_empty.map(s2::types::DeleteOnEmpty::from);

let mut stream_config = s2::types::StreamConfig::new();
if let Some(storage_class) = storage_class {
stream_config = stream_config.with_storage_class(storage_class);
Expand All @@ -214,6 +250,9 @@ impl From<StreamConfig> for s2::types::StreamConfig {
if let Some(timestamping) = timestamping_config {
stream_config = stream_config.with_timestamping(timestamping);
}
if let Some(delete_on_empty) = delete_on_empty {
stream_config = stream_config.with_delete_on_empty(delete_on_empty);
}
stream_config
}
}
Expand Down Expand Up @@ -306,6 +345,7 @@ impl From<s2::types::StreamConfig> for StreamConfig {
storage_class: config.storage_class.map(Into::into),
retention_policy: config.retention_policy.map(Into::into),
timestamping: config.timestamping.map(Into::into),
delete_on_empty: config.delete_on_empty.map(Into::into),
}
}
}
Expand Down
Loading