Skip to content
This repository was archived by the owner on Jan 30, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ jobs:
toolchain: stable
components: rustfmt, clippy
- name: install protoc
uses: arduino/setup-protoc@v3
uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
- name: Run cargo tests
run: cargo test
Expand All @@ -53,7 +55,9 @@ jobs:
toolchain: stable
components: rustfmt, clippy
- name: install protoc
uses: arduino/setup-protoc@v3
uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
- name: Install cargo-sort
uses: baptiste0928/cargo-install@v3
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ miette = { version = "7.6.0", features = ["fancy"] }
rand = "0.9.1"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
streamstore = { version = "0.18.0", features = ["connector"] }
streamstore = { version = "0.19.0", features = ["connector"] }
thiserror = "2.0.12"
tokio = { version = "1.45.1", features = ["full"] }
tokio-stream = { version = "0.1.17", features = ["io-util"] }
Expand Down
23 changes: 16 additions & 7 deletions src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use s2::{
client::Client,
types::{
AccessTokenId, AccessTokenInfo, BasinConfig, BasinInfo, BasinName, CreateBasinRequest,
DeleteBasinRequest, ListAccessTokensRequest, ListAccessTokensResponse, ListBasinsRequest,
ListBasinsResponse, Operation, PermittedOperationGroups, ReconfigureBasinRequest,
ResourceSet, StreamConfig,
DeleteBasinRequest, DeleteOnEmpty, ListAccessTokensRequest, ListAccessTokensResponse,
ListBasinsRequest, ListBasinsResponse, Operation, PermittedOperationGroups,
ReconfigureBasinRequest, ResourceSet, StreamConfig,
},
};

Expand Down Expand Up @@ -77,21 +77,30 @@ impl AccountService {
pub async fn create_basin(
&self,
basin: BasinName,
storage_class: Option<crate::types::StorageClass>,
retention_policy: Option<crate::types::RetentionPolicy>,
configured_stream_config: crate::types::StreamConfig,
create_stream_on_append: bool,
create_stream_on_read: bool,
) -> Result<BasinInfo, ServiceError> {
let mut stream_config = StreamConfig::new();

if let Some(storage_class) = storage_class {
if let Some(storage_class) = configured_stream_config.storage_class {
stream_config = stream_config.with_storage_class(storage_class.into());
}

if let Some(retention_policy) = retention_policy {
if let Some(retention_policy) = configured_stream_config.retention_policy {
stream_config = stream_config.with_retention_policy(retention_policy.into());
}

if let Some(timestamping) = configured_stream_config.timestamping {
stream_config = stream_config.with_timestamping(timestamping.into());
}

if let Some(delete_on_empty_min_age) = configured_stream_config.delete_on_empty {
stream_config = stream_config.with_delete_on_empty(DeleteOnEmpty {
min_age: delete_on_empty_min_age.delete_on_empty_min_age,
});
}

let basin_config = BasinConfig {
default_stream_config: Some(stream_config),
create_stream_on_append,
Expand Down
47 changes: 33 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use tokio_stream::{
use tracing::trace;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
use types::{
AccessTokenInfo, BasinConfig, Operation, PermittedOperationGroups, ResourceSet,
RetentionPolicy, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri, StorageClass,
StreamConfig, TimestampingConfig, TimestampingMode,
AccessTokenInfo, BasinConfig, DeleteOnEmptyConfig, Operation, PermittedOperationGroups,
ResourceSet, RetentionPolicy, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri,
StorageClass, StreamConfig, TimestampingConfig, TimestampingMode,
};

mod account;
Expand Down Expand Up @@ -184,6 +184,10 @@ enum Commands {
/// Create stream on read with basin defaults if it doesn't exist.
#[arg(long)]
create_stream_on_read: Option<bool>,

/// Delete-on-empty configuration for the default stream config.
#[arg(long)]
delete_on_empty_min_age: Option<humantime::Duration>,
},

/// Issue an access token.
Expand Down Expand Up @@ -322,6 +326,10 @@ enum Commands {
/// Uncapped timestamps for the stream.
#[arg(long)]
timestamping_uncapped: Option<bool>,

/// Delete-on-empty configuration for the stream.
#[arg(long)]
delete_on_empty_min_age: Option<humantime::Duration>,
},

/// Get the next sequence number that will be assigned by a stream.
Expand Down Expand Up @@ -645,12 +653,14 @@ fn build_basin_reconfig(
timestamping_uncapped: Option<&bool>,
create_stream_on_append: Option<&bool>,
create_stream_on_read: Option<&bool>,
delete_on_empty_min_age: Option<&humantime::Duration>,
) -> (Option<StreamConfig>, Vec<String>) {
let mut mask = Vec::new();
let has_stream_args = storage_class.is_some()
|| retention_policy.is_some()
|| timestamping_mode.is_some()
|| timestamping_uncapped.is_some();
|| timestamping_uncapped.is_some()
|| delete_on_empty_min_age.is_some();

let default_stream_config = if has_stream_args {
let timestamping = if timestamping_mode.is_some() || timestamping_uncapped.is_some() {
Expand All @@ -666,6 +676,9 @@ fn build_basin_reconfig(
storage_class: storage_class.cloned(),
retention_policy: retention_policy.cloned(),
timestamping,
delete_on_empty: delete_on_empty_min_age.map(|d| DeleteOnEmptyConfig {
delete_on_empty_min_age: (*d).into(),
}),
})
} else {
None
Expand All @@ -683,6 +696,9 @@ fn build_basin_reconfig(
if timestamping_uncapped.is_some() {
mask.push("default_stream_config.timestamping.uncapped".to_owned());
}
if delete_on_empty_min_age.is_some() {
mask.push("default_stream_config.delete_on_empty".to_owned());
}
if create_stream_on_append.is_some() {
mask.push("create_stream_on_append".to_owned());
}
Expand All @@ -698,6 +714,7 @@ fn build_stream_reconfig(
retention_policy: Option<&RetentionPolicy>,
timestamping_mode: Option<&TimestampingMode>,
timestamping_uncapped: Option<&bool>,
delete_on_empty_min_age: Option<&humantime::Duration>,
) -> (StreamConfig, Vec<String>) {
let mut mask = Vec::new();

Expand All @@ -714,6 +731,9 @@ fn build_stream_reconfig(
storage_class: storage_class.cloned(),
retention_policy: retention_policy.cloned(),
timestamping,
delete_on_empty: delete_on_empty_min_age.map(|d| DeleteOnEmptyConfig {
delete_on_empty_min_age: (*d).into(),
}),
};

if storage_class.is_some() {
Expand All @@ -728,6 +748,9 @@ fn build_stream_reconfig(
if timestamping_uncapped.is_some() {
mask.push("timestamping.uncapped".to_string());
}
if delete_on_empty_min_age.is_some() {
mask.push("delete_on_empty".to_string());
}

(stream_config, mask)
}
Expand Down Expand Up @@ -946,19 +969,11 @@ async fn run() -> Result<(), S2CliError> {
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.access_token)?;
let account_service = AccountService::new(Client::new(client_config));
let (storage_class, retention_policy) = match &config.default_stream_config {
Some(config) => {
let storage_class = config.storage_class.clone();
let retention_policy = config.retention_policy.clone();
(storage_class, retention_policy)
}
None => (None, None),
};

let BasinInfo { state, .. } = account_service
.create_basin(
basin.into(),
storage_class,
retention_policy,
config.default_stream_config.unwrap_or_default(),
config.create_stream_on_append.unwrap_or_default(),
config.create_stream_on_read.unwrap_or_default(),
)
Expand Down Expand Up @@ -996,6 +1011,7 @@ async fn run() -> Result<(), S2CliError> {
timestamping_uncapped,
create_stream_on_append,
create_stream_on_read,
delete_on_empty_min_age,
} => {
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.access_token)?;
Expand All @@ -1008,6 +1024,7 @@ async fn run() -> Result<(), S2CliError> {
timestamping_uncapped.as_ref(),
create_stream_on_append.as_ref(),
create_stream_on_read.as_ref(),
delete_on_empty_min_age.as_ref(),
);

let basin_config = BasinConfig {
Expand Down Expand Up @@ -1079,6 +1096,7 @@ async fn run() -> Result<(), S2CliError> {
retention_policy,
timestamping_mode,
timestamping_uncapped,
delete_on_empty_min_age,
} => {
let S2BasinAndStreamUri { basin, stream } = uri.uri;
let cfg = config::load_config(&config_path)?;
Expand All @@ -1090,6 +1108,7 @@ async fn run() -> Result<(), S2CliError> {
retention_policy.as_ref(),
timestamping_mode.as_ref(),
timestamping_uncapped.as_ref(),
delete_on_empty_min_age.as_ref(),
);

let config: StreamConfig = BasinService::new(basin_client)
Expand Down
42 changes: 41 additions & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub struct BasinConfig {
pub create_stream_on_read: Option<bool>,
}

#[derive(Parser, Debug, Clone, Serialize)]
#[derive(Parser, Debug, Default, Clone, Serialize)]
pub struct StreamConfig {
#[arg(long)]
/// Storage class for a stream.
Expand All @@ -136,6 +136,9 @@ pub struct StreamConfig {
#[clap(flatten)]
/// Timestamping configuration.
pub timestamping: Option<TimestampingConfig>,
#[arg(long, help("Example: 1d, 1w, 1y"))]
/// Delete-on-empty configuration.
pub delete_on_empty: Option<DeleteOnEmptyConfig>,
}

#[derive(ValueEnum, Debug, Clone, Serialize)]
Expand Down Expand Up @@ -178,6 +181,37 @@ impl From<&str> for RetentionPolicy {
}
}
}
#[derive(Clone, Debug, Serialize)]
pub struct DeleteOnEmptyConfig {
pub delete_on_empty_min_age: Duration,
}

impl FromStr for DeleteOnEmptyConfig {
type Err = humantime::DurationError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let duration = humantime::parse_duration(s)?;
Ok(Self {
delete_on_empty_min_age: duration,
})
}
}

impl From<DeleteOnEmptyConfig> for s2::types::DeleteOnEmpty {
fn from(value: DeleteOnEmptyConfig) -> Self {
s2::types::DeleteOnEmpty {
min_age: value.delete_on_empty_min_age,
}
}
}

impl From<s2::types::DeleteOnEmpty> for DeleteOnEmptyConfig {
fn from(value: s2::types::DeleteOnEmpty) -> Self {
Self {
delete_on_empty_min_age: value.min_age,
}
}
}

impl From<BasinConfig> for s2::types::BasinConfig {
fn from(config: BasinConfig) -> Self {
Expand All @@ -204,6 +238,8 @@ impl From<StreamConfig> for s2::types::StreamConfig {

let timestamping_config = config.timestamping.map(s2::types::TimestampingConfig::from);

let delete_on_empty = config.delete_on_empty.map(s2::types::DeleteOnEmpty::from);

let mut stream_config = s2::types::StreamConfig::new();
if let Some(storage_class) = storage_class {
stream_config = stream_config.with_storage_class(storage_class);
Expand All @@ -214,6 +250,9 @@ impl From<StreamConfig> for s2::types::StreamConfig {
if let Some(timestamping) = timestamping_config {
stream_config = stream_config.with_timestamping(timestamping);
}
if let Some(delete_on_empty) = delete_on_empty {
stream_config = stream_config.with_delete_on_empty(delete_on_empty);
}
stream_config
}
}
Expand Down Expand Up @@ -306,6 +345,7 @@ impl From<s2::types::StreamConfig> for StreamConfig {
storage_class: config.storage_class.map(Into::into),
retention_policy: config.retention_policy.map(Into::into),
timestamping: config.timestamping.map(Into::into),
delete_on_empty: config.delete_on_empty.map(Into::into),
}
}
}
Expand Down
Loading