Skip to content

Commit 836db71

Browse files
authored
feat(api): align basin info with stream info (#338)
Align basin lifecycle metadata with stream info while keeping the deprecated basin `state` field as a compatibility-only JSON field. - add `created_at` and `deleted_at` to basin info in the common, API, SDK, and lite layers - derive basin deletion state from `deleted_at` and stop publishing `state` in OpenAPI - update CLI and TUI basin presentation to follow the stream info patterns - tolerate missing basin `created_at` during deserialize by defaulting it to `now_utc()`
1 parent a0c52b6 commit 836db71

File tree

13 files changed

+204
-130
lines changed

13 files changed

+204
-130
lines changed

api/src/v1/basin.rs

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use s2_common::types::{
33
basin::{BasinName, BasinNamePrefix, BasinNameStartAfter},
44
};
55
use serde::{Deserialize, Serialize};
6+
use time::OffsetDateTime;
67

78
use super::config::{BasinConfig, BasinReconfiguration};
89

@@ -41,29 +42,92 @@ pub struct ListBasinsResponse {
4142
}
4243

4344
#[rustfmt::skip]
44-
#[derive(Debug, Clone, Serialize, Deserialize)]
45+
#[derive(Debug, Clone, Serialize)]
4546
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
4647
pub struct BasinInfo {
4748
/// Basin name.
4849
pub name: BasinName,
4950
/// Basin scope.
5051
pub scope: Option<BasinScope>,
51-
/// Basin state.
52+
/// Creation time in RFC 3339 format.
53+
#[serde(with = "time::serde::rfc3339")]
54+
pub created_at: OffsetDateTime,
55+
/// Deletion time in RFC 3339 format, if the basin is being deleted.
56+
#[serde(default, with = "time::serde::rfc3339::option")]
57+
pub deleted_at: Option<OffsetDateTime>,
58+
/// Deprecated basin state inferred from `deleted_at`.
59+
#[cfg_attr(feature = "utoipa", schema(ignore))]
5260
pub state: BasinState,
5361
}
5462

5563
impl From<types::basin::BasinInfo> for BasinInfo {
5664
fn from(value: types::basin::BasinInfo) -> Self {
57-
let types::basin::BasinInfo { name, scope, state } = value;
65+
let types::basin::BasinInfo {
66+
name,
67+
scope,
68+
created_at,
69+
deleted_at,
70+
} = value;
5871

5972
Self {
6073
name,
6174
scope: scope.map(Into::into),
62-
state: state.into(),
75+
created_at,
76+
deleted_at,
77+
state: basin_state_for_deleted_at(deleted_at.as_ref()),
6378
}
6479
}
6580
}
6681

82+
fn basin_state_for_deleted_at(deleted_at: Option<&OffsetDateTime>) -> BasinState {
83+
if deleted_at.is_some() {
84+
BasinState::Deleting
85+
} else {
86+
BasinState::Active
87+
}
88+
}
89+
90+
#[derive(Deserialize)]
91+
struct BasinInfoSerde {
92+
name: BasinName,
93+
scope: Option<BasinScope>,
94+
#[serde(default, with = "time::serde::rfc3339::option")]
95+
created_at: Option<OffsetDateTime>,
96+
#[serde(default, with = "time::serde::rfc3339::option")]
97+
deleted_at: Option<OffsetDateTime>,
98+
state: Option<BasinState>,
99+
}
100+
101+
impl<'de> Deserialize<'de> for BasinInfo {
102+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
103+
where
104+
D: serde::Deserializer<'de>,
105+
{
106+
let BasinInfoSerde {
107+
name,
108+
scope,
109+
created_at,
110+
deleted_at,
111+
state,
112+
} = BasinInfoSerde::deserialize(deserializer)?;
113+
let created_at = created_at.unwrap_or_else(OffsetDateTime::now_utc);
114+
let deleted_at = match (deleted_at, state) {
115+
(Some(deleted_at), _) => Some(deleted_at),
116+
(None, Some(BasinState::Deleting)) => Some(OffsetDateTime::now_utc()),
117+
(None, _) => None,
118+
};
119+
let state = basin_state_for_deleted_at(deleted_at.as_ref());
120+
121+
Ok(Self {
122+
name,
123+
scope,
124+
created_at,
125+
deleted_at,
126+
state,
127+
})
128+
}
129+
}
130+
67131
#[rustfmt::skip]
68132
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
69133
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
@@ -100,15 +164,6 @@ pub enum BasinState {
100164
Deleting,
101165
}
102166

103-
impl From<types::basin::BasinState> for BasinState {
104-
fn from(value: types::basin::BasinState) -> Self {
105-
match value {
106-
types::basin::BasinState::Active => Self::Active,
107-
types::basin::BasinState::Deleting => Self::Deleting,
108-
}
109-
}
110-
}
111-
112167
#[rustfmt::skip]
113168
#[derive(Debug, Clone, Serialize, Deserialize)]
114169
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]

api/src/v1/stream/sse.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{str::FromStr, time::Duration};
1+
use std::str::FromStr;
22

33
use s2_common::{http::ParseableHeader, types};
44
use serde::Serialize;
@@ -126,7 +126,8 @@ pub enum ReadEvent {
126126
},
127127
}
128128

129-
fn elapsed_since_epoch() -> Duration {
129+
#[cfg(feature = "axum")]
130+
fn elapsed_since_epoch() -> std::time::Duration {
130131
std::time::SystemTime::now()
131132
.duration_since(std::time::SystemTime::UNIX_EPOCH)
132133
.expect("healthy clock")

cli/src/main.rs

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ use record_format::{
3232
use s2_sdk::{
3333
S2,
3434
types::{
35-
AppendRetryPolicy, BasinState, CreateStreamInput, DeleteOnEmptyConfig, DeleteStreamInput,
36-
MeteredBytes, Metric, RetentionPolicy, RetryConfig, StreamConfig as SdkStreamConfig,
37-
StreamName, TimestampingConfig, TimestampingMode,
35+
AppendRetryPolicy, CreateStreamInput, DeleteOnEmptyConfig, DeleteStreamInput, MeteredBytes,
36+
Metric, RetentionPolicy, RetryConfig, StreamConfig as SdkStreamConfig, StreamName,
37+
TimestampingConfig, TimestampingMode,
3838
},
3939
};
4040
use strum::VariantNames;
@@ -196,11 +196,10 @@ async fn run() -> Result<(), CliError> {
196196

197197
let (streams, _) = ops::list_streams(&s2, list_streams_args).await?;
198198
for stream_info in streams {
199-
println!(
200-
"s2://{}/{} {}",
201-
basin,
202-
stream_info.name,
203-
stream_info.created_at.to_string().green(),
199+
print_listing_with_created_at(
200+
format!("s2://{}/{}", basin, stream_info.name),
201+
stream_info.created_at.to_string(),
202+
stream_info.deleted_at.is_some(),
204203
);
205204
}
206205
} else {
@@ -224,34 +223,21 @@ async fn run() -> Result<(), CliError> {
224223

225224
let (basins, _) = ops::list_basins(&s2, list_basins_args).await?;
226225
for basin_info in basins {
227-
println!(
228-
"{} {}",
229-
basin_info.name,
230-
format_basin_state(basin_info.state)
231-
);
226+
print_listing_uri(basin_info.name.to_string(), basin_info.deleted_at.is_some());
232227
}
233228
}
234229
}
235230

236231
Command::ListBasins(args) => {
237232
let (basins, _) = ops::list_basins(&s2, args).await?;
238233
for basin_info in basins {
239-
println!(
240-
"{} {}",
241-
basin_info.name,
242-
format_basin_state(basin_info.state)
243-
);
234+
print_listing_uri(basin_info.name.to_string(), basin_info.deleted_at.is_some());
244235
}
245236
}
246237

247238
Command::CreateBasin(args) => {
248-
let info = ops::create_basin(&s2, args).await?;
249-
250-
let message = match info.state {
251-
BasinState::Active => "✓ Basin created".green().bold(),
252-
BasinState::Deleting => "Basin is being deleted".red().bold(),
253-
};
254-
eprintln!("{message}");
239+
let _info = ops::create_basin(&s2, args).await?;
240+
eprintln!("{}", "✓ Basin created".green().bold());
255241
}
256242

257243
Command::DeleteBasin { basin } => {
@@ -311,7 +297,10 @@ async fn run() -> Result<(), CliError> {
311297
let basin_name = args.uri.basin.clone();
312298
let (streams, _) = ops::list_streams(&s2, args).await?;
313299
for stream_info in streams {
314-
println!("s2://{}/{}", basin_name, stream_info.name);
300+
print_listing_uri(
301+
format!("s2://{}/{}", basin_name, stream_info.name),
302+
stream_info.deleted_at.is_some(),
303+
);
315304
}
316305
}
317306

@@ -623,15 +612,40 @@ async fn run() -> Result<(), CliError> {
623612
result.map_err(|err| err.with_token_source(token_source))
624613
}
625614

626-
fn format_basin_state(state: BasinState) -> colored::ColoredString {
627-
match state {
628-
BasinState::Active => "active".green(),
629-
BasinState::Deleting => "deleting".red(),
615+
fn format_position(seq_num: u64, timestamp: u64) -> String {
616+
format!("{seq_num} @ {timestamp}")
617+
}
618+
619+
fn print_listing_uri(uri: String, is_deleting: bool) {
620+
let uri = format_listing_uri(uri, is_deleting);
621+
if is_deleting {
622+
println!("{} {}", uri, deletion_marker());
623+
} else {
624+
println!("{uri}");
630625
}
631626
}
632627

633-
fn format_position(seq_num: u64, timestamp: u64) -> String {
634-
format!("{seq_num} @ {timestamp}")
628+
fn print_listing_with_created_at(uri: String, created_at: String, is_deleting: bool) {
629+
let uri = format_listing_uri(uri, is_deleting);
630+
let created_at = if is_deleting {
631+
created_at.red()
632+
} else {
633+
created_at.green()
634+
};
635+
636+
if is_deleting {
637+
println!("{} {} {}", uri, created_at, deletion_marker());
638+
} else {
639+
println!("{} {}", uri, created_at);
640+
}
641+
}
642+
643+
fn format_listing_uri(uri: String, is_deleting: bool) -> colored::ColoredString {
644+
if is_deleting { uri.red() } else { uri.normal() }
645+
}
646+
647+
fn deletion_marker() -> colored::ColoredString {
648+
"[deleting]".red().bold()
635649
}
636650

637651
async fn write_record(

cli/src/ops.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,9 @@ pub async fn list_basins(
6060
.list_basins(input)
6161
.await
6262
.map_err(|e| CliError::op(OpKind::ListBasins, e))?;
63-
6463
Ok((page.values, page.has_more))
6564
} else {
66-
let mut input = ListAllBasinsInput::new();
65+
let mut input = ListAllBasinsInput::new().with_include_deleted(true);
6766
if let Some(p) = prefix {
6867
input = input.with_prefix(p);
6968
}
@@ -329,10 +328,9 @@ pub async fn list_streams(
329328
.list_streams(input)
330329
.await
331330
.map_err(|e| CliError::op(OpKind::ListStreams, e))?;
332-
333331
Ok((page.values, page.has_more))
334332
} else {
335-
let mut input = ListAllStreamsInput::new();
333+
let mut input = ListAllStreamsInput::new().with_include_deleted(true);
336334
if let Some(p) = prefix {
337335
input = input.with_prefix(p);
338336
}

cli/src/tui/app.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use chrono::{Datelike, NaiveDate};
1212
use crossterm::event::{self, Event as CrosstermEvent, KeyCode, KeyEvent, KeyModifiers};
1313
use ratatui::{Terminal, prelude::Backend};
1414
use s2_sdk::types::{
15-
AccessTokenId, AccessTokenInfo, BasinInfo, BasinMetricSet, BasinName, BasinState, StreamInfo,
15+
AccessTokenId, AccessTokenInfo, BasinInfo, BasinMetricSet, BasinName, StreamInfo,
1616
StreamMetricSet, StreamName, StreamPosition, TimeRange,
1717
};
1818
use tokio::sync::mpsc;
@@ -3739,7 +3739,7 @@ impl App {
37393739
})
37403740
.collect();
37413741
if let Some(basin) = filtered.get(state.selected) {
3742-
if basin.state == BasinState::Deleting {
3742+
if basin.deleted_at.is_some() {
37433743
self.message = Some(StatusMessage {
37443744
text: "Basin is already being deleted".to_string(),
37453745
level: MessageLevel::Info,

cli/src/tui/ui.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2468,11 +2468,11 @@ fn draw_basins(f: &mut Frame, area: Rect, state: &BasinsState) {
24682468
let max_name_len = name_col.saturating_sub(2);
24692469
let display_name = truncate_str(&name, max_name_len, "…");
24702470

2471-
let (state_text, state_bg) = match basin.state {
2472-
s2_sdk::types::BasinState::Active => ("Active", BADGE_ACTIVE),
2473-
s2_sdk::types::BasinState::Deleting => ("Deleting", BADGE_DANGER),
2471+
let (state_text, state_bg) = if basin.deleted_at.is_some() {
2472+
("Deleting", BADGE_DANGER)
2473+
} else {
2474+
("Active", BADGE_ACTIVE)
24742475
};
2475-
24762476
let scope = basin
24772477
.scope
24782478
.as_ref()

common/src/types/basin.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{marker::PhantomData, ops::Deref, str::FromStr};
22

33
use compact_str::{CompactString, ToCompactString};
4+
use time::OffsetDateTime;
45

56
use super::{
67
ValidationError,
@@ -199,12 +200,6 @@ impl crate::http::ParseableHeader for BasinName {
199200

200201
pub type ListBasinsRequest = ListItemsRequest<BasinNamePrefix, BasinNameStartAfter>;
201202

202-
#[derive(Debug, Clone, Copy)]
203-
pub enum BasinState {
204-
Active,
205-
Deleting,
206-
}
207-
208203
#[derive(Debug, strum::Display, Clone, Copy, PartialEq, Eq)]
209204
pub enum BasinScope {
210205
#[strum(serialize = "aws:us-east-1")]
@@ -215,7 +210,8 @@ pub enum BasinScope {
215210
pub struct BasinInfo {
216211
pub name: BasinName,
217212
pub scope: Option<BasinScope>,
218-
pub state: BasinState,
213+
pub created_at: OffsetDateTime,
214+
pub deleted_at: Option<OffsetDateTime>,
219215
}
220216

221217
#[cfg(test)]

lite/src/backend/basins.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use s2_common::{
22
bash::Bash,
33
types::{
4-
basin::{BasinInfo, BasinName, BasinState, ListBasinsRequest},
4+
basin::{BasinInfo, BasinName, ListBasinsRequest},
55
config::{BasinConfig, BasinReconfiguration},
66
resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
77
stream::StreamNameStartAfter,
@@ -58,15 +58,11 @@ impl Backend {
5858
break;
5959
}
6060
let meta = kv::basin_meta::deser_value(kv.value)?;
61-
let state = if meta.deleted_at.is_some() {
62-
BasinState::Deleting
63-
} else {
64-
BasinState::Active
65-
};
6661
basins.push(BasinInfo {
6762
name: basin,
6863
scope: None,
69-
state,
64+
created_at: meta.created_at,
65+
deleted_at: meta.deleted_at,
7066
});
7167
}
7268
Ok(Page::new(basins, has_more))
@@ -106,7 +102,8 @@ impl Backend {
106102
Ok(CreatedOrReconfigured::Created(BasinInfo {
107103
name: basin,
108104
scope: None,
109-
state: BasinState::Active,
105+
created_at: existing_meta.created_at,
106+
deleted_at: None,
110107
}))
111108
} else {
112109
Err(BasinAlreadyExistsError { basin }.into())
@@ -149,7 +146,8 @@ impl Backend {
149146
let info = BasinInfo {
150147
name: basin,
151148
scope: None,
152-
state: BasinState::Active,
149+
created_at,
150+
deleted_at: None,
153151
};
154152

155153
Ok(if is_reconfigure {

0 commit comments

Comments
 (0)