Skip to content

Commit fb77da2

Browse files
authored
feature: support Infinite retention (#170)
1 parent 1c25747 commit fb77da2

File tree

4 files changed

+25
-9
lines changed

4 files changed

+25
-9
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ miette = { version = "7.6.0", features = ["fancy"] }
3030
rand = "0.9.1"
3131
serde = { version = "1.0.219", features = ["derive"] }
3232
serde_json = "1.0.140"
33-
streamstore = { version = "0.19.1", features = ["connector"] }
33+
streamstore = { version = "0.20.0", features = ["connector"] }
3434
thiserror = "2.0.12"
3535
tokio = { version = "1.45.1", features = ["full"] }
3636
tokio-stream = { version = "0.1.17", features = ["io-util"] }

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ enum Commands {
173173
#[arg(long)]
174174
storage_class: Option<StorageClass>,
175175

176-
/// Retention policy for the default stream config (e.g., "1d", "1w", "1y").
176+
/// Retention policy for the default stream config (e.g., "1d", "1w", "1y", "infinite").
177177
#[arg(long)]
178178
retention_policy: Option<RetentionPolicy>,
179179

src/types.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,16 +171,30 @@ pub struct TimestampingConfig {
171171
pub enum RetentionPolicy {
172172
#[allow(dead_code)]
173173
Age(Duration),
174+
Infinite(()),
174175
}
175176

176-
impl From<&str> for RetentionPolicy {
177-
fn from(s: &str) -> Self {
178-
match humantime::parse_duration(s) {
179-
Ok(d) => RetentionPolicy::Age(d),
180-
Err(_) => RetentionPolicy::Age(Duration::from_secs(0)),
177+
impl TryFrom<&str> for RetentionPolicy {
178+
type Error = &'static str;
179+
180+
fn try_from(value: &str) -> Result<Self, Self::Error> {
181+
if value == "infinite" {
182+
return Ok(RetentionPolicy::Infinite(()));
183+
} else if let Ok(d) = humantime::parse_duration(value) {
184+
return Ok(RetentionPolicy::Age(d));
181185
}
186+
Err("invalid retention policy: expected a duration, or 'infinite'")
182187
}
183188
}
189+
190+
impl FromStr for RetentionPolicy {
191+
type Err = &'static str;
192+
193+
fn from_str(s: &str) -> Result<Self, Self::Err> {
194+
RetentionPolicy::try_from(s)
195+
}
196+
}
197+
184198
#[derive(Args, Clone, Debug, Serialize)]
185199
pub struct DeleteOnEmptyConfig {
186200
#[arg(long, value_parser = humantime::parse_duration, required = false)]
@@ -311,6 +325,7 @@ impl From<RetentionPolicy> for s2::types::RetentionPolicy {
311325
fn from(policy: RetentionPolicy) -> Self {
312326
match policy {
313327
RetentionPolicy::Age(d) => s2::types::RetentionPolicy::Age(d),
328+
RetentionPolicy::Infinite(()) => s2::types::RetentionPolicy::Infinite(()),
314329
}
315330
}
316331
}
@@ -319,6 +334,7 @@ impl From<s2::types::RetentionPolicy> for RetentionPolicy {
319334
fn from(policy: s2::types::RetentionPolicy) -> Self {
320335
match policy {
321336
s2::types::RetentionPolicy::Age(d) => RetentionPolicy::Age(d),
337+
s2::types::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite(()),
322338
}
323339
}
324340
}

0 commit comments

Comments
 (0)