Skip to content

Commit 6e85ffe

Browse files
feat: add create stream API
1 parent 2ef8b40 commit 6e85ffe

File tree

1 file changed

+29
-0
lines changed
  • watermelon/src/client/jetstream

1 file changed

+29
-0
lines changed

watermelon/src/client/jetstream/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,35 @@ impl JetstreamClient {
9595
}
9696
}
9797

98+
/// Create a new stream
99+
///
100+
/// # Errors
101+
///
102+
/// It returns an error if the stream name produces an invalid subject or if an error occurs
103+
/// while creating the stream.
104+
pub async fn create_stream(&self, config: &StreamConfig) -> Result<Stream, JetstreamError2> {
105+
let subject = format!("{}.STREAM.CREATE.{}", self.prefix, config.name)
106+
.try_into()
107+
.map_err(JetstreamError2::Subject)?;
108+
109+
let payload = serde_json::to_vec(config).map_err(JetstreamError2::Json)?;
110+
let resp = self
111+
.client
112+
.request(subject)
113+
.response_timeout(self.request_timeout)
114+
.payload(payload.into())
115+
.await
116+
.map_err(JetstreamError2::ClientClosed)?;
117+
let resp = resp.await.map_err(JetstreamError2::ResponseError)?;
118+
119+
let json = serde_json::from_slice::<Response<Stream>>(&resp.base.payload)
120+
.map_err(JetstreamError2::Json)?;
121+
match json {
122+
Response::Response(stream) => Ok(stream),
123+
Response::Error { error } => Err(JetstreamError2::Status(error)),
124+
}
125+
}
126+
98127
/// List streams present within this client's Jetstream context
99128
pub fn streams(&self) -> Streams {
100129
Streams::new(self.clone())

0 commit comments

Comments
 (0)