Skip to content

Commit ef550cb

Browse files
refactor!: rename JetstreamError2 to JetstreamError
1 parent 087be17 commit ef550cb

File tree

7 files changed

+53
-53
lines changed

7 files changed

+53
-53
lines changed

watermelon/src/client/jetstream/commands/consumer_batch.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tokio::time::{Sleep, sleep};
1111
use watermelon_proto::{ServerMessage, StatusCode, error::ServerError};
1212

1313
use crate::{
14-
client::{Consumer, JetstreamClient, JetstreamError2},
14+
client::{Consumer, JetstreamClient, JetstreamError},
1515
subscription::Subscription,
1616
};
1717

@@ -43,15 +43,15 @@ impl ConsumerBatch {
4343
client: JetstreamClient,
4444
expires: Duration,
4545
max_msgs: usize,
46-
) -> impl Future<Output = Result<Self, JetstreamError2>> + use<> {
46+
) -> impl Future<Output = Result<Self, JetstreamError>> + use<> {
4747
let subject = format!(
4848
"{}.CONSUMER.MSG.NEXT.{}.{}",
4949
client.prefix, consumer.stream_name, consumer.config.name
5050
)
5151
.try_into();
5252

5353
async move {
54-
let subject = subject.map_err(JetstreamError2::Subject)?;
54+
let subject = subject.map_err(JetstreamError::Subject)?;
5555
let incoming_subject = client.client.create_inbox_subject();
5656
let payload = serde_json::to_vec(&if expires.is_zero() {
5757
json!({
@@ -65,20 +65,20 @@ impl ConsumerBatch {
6565
"no_wait": true
6666
})
6767
})
68-
.map_err(JetstreamError2::Json)?;
68+
.map_err(JetstreamError::Json)?;
6969

7070
let subscription = client
7171
.client
7272
.subscribe(incoming_subject.clone(), None)
7373
.await
74-
.map_err(JetstreamError2::ClientClosed)?;
74+
.map_err(JetstreamError::ClientClosed)?;
7575
client
7676
.client
7777
.publish(subject)
7878
.reply_subject(Some(incoming_subject.clone()))
7979
.payload(payload.into())
8080
.await
81-
.map_err(JetstreamError2::ClientClosed)?;
81+
.map_err(JetstreamError::ClientClosed)?;
8282

8383
let timeout = sleep(expires.saturating_add(client.request_timeout));
8484
Ok(Self {

watermelon/src/client/jetstream/commands/consumer_list.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use serde_json::json;
1212
use watermelon_proto::Subject;
1313

1414
use crate::{
15-
client::{self, JetstreamClient, jetstream::JetstreamError2},
15+
client::{self, JetstreamClient, jetstream::JetstreamError},
1616
util::BoxFuture,
1717
};
1818

@@ -24,7 +24,7 @@ pub struct Consumers {
2424
client: JetstreamClient,
2525
offset: u32,
2626
partial_subject: Subject,
27-
fetch: Option<BoxFuture<'static, Result<ConsumersResponse, JetstreamError2>>>,
27+
fetch: Option<BoxFuture<'static, Result<ConsumersResponse, JetstreamError>>>,
2828
buffer: VecDeque<client::Consumer>,
2929
exhausted: bool,
3030
}
@@ -52,7 +52,7 @@ impl Consumers {
5252
}
5353

5454
impl Stream for Consumers {
55-
type Item = Result<client::Consumer, JetstreamError2>;
55+
type Item = Result<client::Consumer, JetstreamError>;
5656

5757
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
5858
let this = self.get_mut();
@@ -83,10 +83,10 @@ impl Stream for Consumers {
8383
.into(),
8484
)
8585
.await
86-
.map_err(JetstreamError2::ClientClosed)?;
87-
let response = response_fut.await.map_err(JetstreamError2::ResponseError)?;
88-
let payload = serde_json::from_slice(&response.base.payload)
89-
.map_err(JetstreamError2::Json)?;
86+
.map_err(JetstreamError::ClientClosed)?;
87+
let response = response_fut.await.map_err(JetstreamError::ResponseError)?;
88+
let payload =
89+
serde_json::from_slice(&response.base.payload).map_err(JetstreamError::Json)?;
9090
Ok(payload)
9191
})
9292
});

watermelon/src/client/jetstream/commands/consumer_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use pin_project_lite::pin_project;
1010
use watermelon_proto::ServerMessage;
1111

1212
use crate::{
13-
client::{Consumer, JetstreamClient, JetstreamError2},
13+
client::{Consumer, JetstreamClient, JetstreamError},
1414
util::BoxFuture,
1515
};
1616

@@ -36,7 +36,7 @@ pin_project! {
3636
#[project = ConsumerStreamStatusProj]
3737
enum ConsumerStreamStatus {
3838
Polling {
39-
future: BoxFuture<'static, Result<ConsumerBatch, JetstreamError2>>,
39+
future: BoxFuture<'static, Result<ConsumerBatch, JetstreamError>>,
4040
},
4141
RunningBatch {
4242
#[pin]
@@ -51,7 +51,7 @@ pub enum ConsumerStreamError {
5151
#[error("consumer batch error")]
5252
BatchError(#[source] ConsumerBatchError),
5353
#[error("jetstream error")]
54-
Jetstream(#[source] JetstreamError2),
54+
Jetstream(#[source] JetstreamError),
5555
}
5656

5757
impl ConsumerStream {

watermelon/src/client/jetstream/commands/stream_list.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use serde_json::json;
1111
use watermelon_proto::Subject;
1212

1313
use crate::{
14-
client::{self, JetstreamClient, jetstream::JetstreamError2},
14+
client::{self, JetstreamClient, jetstream::JetstreamError},
1515
util::BoxFuture,
1616
};
1717

@@ -22,7 +22,7 @@ use crate::{
2222
pub struct Streams {
2323
client: JetstreamClient,
2424
offset: u32,
25-
fetch: Option<BoxFuture<'static, Result<StreamsResponse, JetstreamError2>>>,
25+
fetch: Option<BoxFuture<'static, Result<StreamsResponse, JetstreamError>>>,
2626
buffer: VecDeque<client::Stream>,
2727
exhausted: bool,
2828
}
@@ -46,7 +46,7 @@ impl Streams {
4646
}
4747

4848
impl Stream for Streams {
49-
type Item = Result<client::Stream, JetstreamError2>;
49+
type Item = Result<client::Stream, JetstreamError>;
5050

5151
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
5252
let this = self.get_mut();
@@ -76,10 +76,10 @@ impl Stream for Streams {
7676
.into(),
7777
)
7878
.await
79-
.map_err(JetstreamError2::ClientClosed)?;
80-
let response = response_fut.await.map_err(JetstreamError2::ResponseError)?;
81-
let payload = serde_json::from_slice(&response.base.payload)
82-
.map_err(JetstreamError2::Json)?;
79+
.map_err(JetstreamError::ClientClosed)?;
80+
let response = response_fut.await.map_err(JetstreamError::ResponseError)?;
81+
let payload =
82+
serde_json::from_slice(&response.base.payload).map_err(JetstreamError::Json)?;
8383
Ok(payload)
8484
})
8585
});

watermelon/src/client/jetstream/mod.rs

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub struct JetstreamErrorCode(u16);
5353

5454
/// An error encountered while making a Jetstream request
5555
#[derive(Debug, thiserror::Error)]
56-
pub enum JetstreamError2 {
56+
pub enum JetstreamError {
5757
#[error("invalid subject")]
5858
Subject(#[source] SubjectValidateError),
5959
#[error("client closed")]
@@ -102,24 +102,24 @@ impl JetstreamClient {
102102
///
103103
/// It returns an error if the stream name produces an invalid subject or if an error occurs
104104
/// while creating the stream.
105-
pub async fn create_stream(&self, config: &StreamConfig) -> Result<Stream, JetstreamError2> {
105+
pub async fn create_stream(&self, config: &StreamConfig) -> Result<Stream, JetstreamError> {
106106
let subject = format!("{}.STREAM.CREATE.{}", self.prefix, config.name)
107107
.try_into()
108-
.map_err(JetstreamError2::Subject)?;
108+
.map_err(JetstreamError::Subject)?;
109109

110-
let payload = serde_json::to_vec(config).map_err(JetstreamError2::Json)?;
110+
let payload = serde_json::to_vec(config).map_err(JetstreamError::Json)?;
111111
let resp = self
112112
.make_request(subject)
113113
.payload(payload.into())
114114
.await
115-
.map_err(JetstreamError2::ClientClosed)?;
116-
let resp = resp.await.map_err(JetstreamError2::ResponseError)?;
115+
.map_err(JetstreamError::ClientClosed)?;
116+
let resp = resp.await.map_err(JetstreamError::ResponseError)?;
117117

118118
let json = serde_json::from_slice::<Response<Stream>>(&resp.base.payload)
119-
.map_err(JetstreamError2::Json)?;
119+
.map_err(JetstreamError::Json)?;
120120
match json {
121121
Response::Response(stream) => Ok(stream),
122-
Response::Error { error } => Err(JetstreamError2::Api(error)),
122+
Response::Error { error } => Err(JetstreamError::Api(error)),
123123
}
124124
}
125125

@@ -134,25 +134,25 @@ impl JetstreamClient {
134134
///
135135
/// It returns an error if the given `name` produces an invalid subject or if an error occurs
136136
/// while creating the stream.
137-
pub async fn stream(&self, name: impl Display) -> Result<Option<Stream>, JetstreamError2> {
137+
pub async fn stream(&self, name: impl Display) -> Result<Option<Stream>, JetstreamError> {
138138
let subject = format!("{}.STREAM.INFO.{}", self.prefix, name)
139139
.try_into()
140-
.map_err(JetstreamError2::Subject)?;
140+
.map_err(JetstreamError::Subject)?;
141141
let resp = self
142142
.make_request(subject)
143143
.payload(Bytes::new())
144144
.await
145-
.map_err(JetstreamError2::ClientClosed)?;
146-
let resp = resp.await.map_err(JetstreamError2::ResponseError)?;
145+
.map_err(JetstreamError::ClientClosed)?;
146+
let resp = resp.await.map_err(JetstreamError::ResponseError)?;
147147

148148
let json = serde_json::from_slice::<Response<Stream>>(&resp.base.payload)
149-
.map_err(JetstreamError2::Json)?;
149+
.map_err(JetstreamError::Json)?;
150150
match json {
151151
Response::Response(stream) => Ok(Some(stream)),
152152
Response::Error { error } if error.code == JetstreamErrorCode::STREAM_NOT_FOUND => {
153153
Ok(None)
154154
}
155-
Response::Error { error } => Err(JetstreamError2::Api(error)),
155+
Response::Error { error } => Err(JetstreamError::Api(error)),
156156
}
157157
}
158158

@@ -166,7 +166,7 @@ impl JetstreamClient {
166166
&self,
167167
stream_name: &str,
168168
config: &ConsumerConfig,
169-
) -> Result<Consumer, JetstreamError2> {
169+
) -> Result<Consumer, JetstreamError> {
170170
let mut subject = format!(
171171
"{}.CONSUMER.CREATE.{}.{}",
172172
self.prefix, stream_name, config.name
@@ -176,27 +176,27 @@ impl JetstreamClient {
176176
subject.push_str(filter_subject);
177177
}
178178

179-
let subject = subject.try_into().map_err(JetstreamError2::Subject)?;
179+
let subject = subject.try_into().map_err(JetstreamError::Subject)?;
180180

181181
let payload = serde_json::to_vec(&json!({
182182
"stream_name": stream_name,
183183
"config": config,
184184
"action": "create",
185185
"pedantic": true,
186186
}))
187-
.map_err(JetstreamError2::Json)?;
187+
.map_err(JetstreamError::Json)?;
188188
let resp = self
189189
.make_request(subject)
190190
.payload(payload.into())
191191
.await
192-
.map_err(JetstreamError2::ClientClosed)?;
193-
let resp = resp.await.map_err(JetstreamError2::ResponseError)?;
192+
.map_err(JetstreamError::ClientClosed)?;
193+
let resp = resp.await.map_err(JetstreamError::ResponseError)?;
194194

195195
let json = serde_json::from_slice::<Response<Consumer>>(&resp.base.payload)
196-
.map_err(JetstreamError2::Json)?;
196+
.map_err(JetstreamError::Json)?;
197197
match json {
198198
Response::Response(consumer) => Ok(consumer),
199-
Response::Error { error } => Err(JetstreamError2::Api(error)),
199+
Response::Error { error } => Err(JetstreamError::Api(error)),
200200
}
201201
}
202202

@@ -215,28 +215,28 @@ impl JetstreamClient {
215215
&self,
216216
stream_name: impl Display,
217217
consumer_name: impl Display,
218-
) -> Result<Option<Consumer>, JetstreamError2> {
218+
) -> Result<Option<Consumer>, JetstreamError> {
219219
let subject = format!(
220220
"{}.CONSUMER.INFO.{}.{}",
221221
self.prefix, stream_name, consumer_name
222222
)
223223
.try_into()
224-
.map_err(JetstreamError2::Subject)?;
224+
.map_err(JetstreamError::Subject)?;
225225
let resp = self
226226
.make_request(subject)
227227
.payload(Bytes::new())
228228
.await
229-
.map_err(JetstreamError2::ClientClosed)?;
230-
let resp = resp.await.map_err(JetstreamError2::ResponseError)?;
229+
.map_err(JetstreamError::ClientClosed)?;
230+
let resp = resp.await.map_err(JetstreamError::ResponseError)?;
231231

232232
let json = serde_json::from_slice::<Response<Consumer>>(&resp.base.payload)
233-
.map_err(JetstreamError2::Json)?;
233+
.map_err(JetstreamError::Json)?;
234234
match json {
235235
Response::Response(stream) => Ok(Some(stream)),
236236
Response::Error { error } if error.code == JetstreamErrorCode::CONSUMER_NOT_FOUND => {
237237
Ok(None)
238238
}
239-
Response::Error { error } => Err(JetstreamError2::Api(error)),
239+
Response::Error { error } => Err(JetstreamError::Api(error)),
240240
}
241241
}
242242

@@ -250,7 +250,7 @@ impl JetstreamClient {
250250
consumer: &Consumer,
251251
expires: Duration,
252252
max_msgs: usize,
253-
) -> Result<ConsumerBatch, JetstreamError2> {
253+
) -> Result<ConsumerBatch, JetstreamError> {
254254
ConsumerBatch::new(consumer, self.clone(), expires, max_msgs).await
255255
}
256256

watermelon/src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub use self::commands::{
2828
pub use self::jetstream::{
2929
AckPolicy, Compression, Consumer, ConsumerBatch, ConsumerConfig, ConsumerDurability,
3030
ConsumerSpecificConfig, ConsumerStorage, ConsumerStream, ConsumerStreamError, Consumers,
31-
DeliverPolicy, DiscardPolicy, JetstreamApiError, JetstreamClient, JetstreamError2,
31+
DeliverPolicy, DiscardPolicy, JetstreamApiError, JetstreamClient, JetstreamError,
3232
JetstreamErrorCode, ReplayPolicy, RetentionPolicy, Storage, Stream, StreamConfig, StreamState,
3333
Streams,
3434
};

watermelon/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub mod jetstream {
6363
//! NATS Jetstream specific errors
6464
6565
pub use crate::client::{
66-
ConsumerStreamError, JetstreamApiError, JetstreamError2, JetstreamErrorCode,
66+
ConsumerStreamError, JetstreamApiError, JetstreamError, JetstreamErrorCode,
6767
};
6868
}
6969
}

0 commit comments

Comments
 (0)