Skip to content

Commit 05649d5

Browse files
refactor: slightly reduce repetitiveness of making jetstream requests
1 parent d303b8e commit 05649d5

File tree

1 file changed

+11
-12
lines changed
  • watermelon/src/client/jetstream

1 file changed

+11
-12
lines changed

watermelon/src/client/jetstream/mod.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub use self::resources::{
1414
ConsumerStorage, DeliverPolicy, DiscardPolicy, ReplayPolicy, RetentionPolicy, Storage, Stream,
1515
StreamConfig, StreamState,
1616
};
17+
use crate::client::ClientRequest;
1718
use crate::core::Client;
1819

1920
use super::{ClientClosedError, ResponseError};
@@ -108,9 +109,7 @@ impl JetstreamClient {
108109

109110
let payload = serde_json::to_vec(config).map_err(JetstreamError2::Json)?;
110111
let resp = self
111-
.client
112-
.request(subject)
113-
.response_timeout(self.request_timeout)
112+
.make_request(subject)
114113
.payload(payload.into())
115114
.await
116115
.map_err(JetstreamError2::ClientClosed)?;
@@ -140,9 +139,7 @@ impl JetstreamClient {
140139
.try_into()
141140
.map_err(JetstreamError2::Subject)?;
142141
let resp = self
143-
.client
144-
.request(subject)
145-
.response_timeout(self.request_timeout)
142+
.make_request(subject)
146143
.payload(Bytes::new())
147144
.await
148145
.map_err(JetstreamError2::ClientClosed)?;
@@ -189,9 +186,7 @@ impl JetstreamClient {
189186
}))
190187
.map_err(JetstreamError2::Json)?;
191188
let resp = self
192-
.client
193-
.request(subject)
194-
.response_timeout(self.request_timeout)
189+
.make_request(subject)
195190
.payload(payload.into())
196191
.await
197192
.map_err(JetstreamError2::ClientClosed)?;
@@ -228,9 +223,7 @@ impl JetstreamClient {
228223
.try_into()
229224
.map_err(JetstreamError2::Subject)?;
230225
let resp = self
231-
.client
232-
.request(subject)
233-
.response_timeout(self.request_timeout)
226+
.make_request(subject)
234227
.payload(Bytes::new())
235228
.await
236229
.map_err(JetstreamError2::ClientClosed)?;
@@ -275,6 +268,12 @@ impl JetstreamClient {
275268
Subject::from_dangerous_value(format!("{}.{}", self.prefix, endpoint).into())
276269
}
277270

271+
fn make_request(&self, subject: Subject) -> ClientRequest<'_> {
272+
self.client
273+
.request(subject)
274+
.response_timeout(self.request_timeout)
275+
}
276+
278277
/// Get a reference to the inner NATS Core client
279278
#[must_use]
280279
pub fn client(&self) -> &Client {

0 commit comments

Comments
 (0)