Skip to content

Commit 3107329

Browse files
authored
refactor(trogon-nats): use per-operation traits with ToSubject (#6)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 049b087 commit 3107329

File tree

3 files changed

+53
-29
lines changed

3 files changed

+53
-29
lines changed

rsworkspace/crates/trogon-nats/src/client.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use async_nats::subject::ToSubject;
12
use async_nats::{Client as NatsAsyncClient, HeaderMap, Message, Subscriber};
23
use bytes::Bytes;
34
use std::error::Error;
@@ -6,18 +7,18 @@ use std::future::Future;
67
pub trait SubscribeClient: Send + Sync + Clone + 'static {
78
type SubscribeError: Error + Send + Sync;
89

9-
fn subscribe(
10+
fn subscribe<S: ToSubject + Send>(
1011
&self,
11-
subject: String,
12+
subject: S,
1213
) -> impl Future<Output = Result<Subscriber, Self::SubscribeError>> + Send;
1314
}
1415

1516
pub trait RequestClient: Send + Sync + Clone + 'static {
1617
type RequestError: Error + Send + Sync;
1718

18-
fn request_with_headers(
19+
fn request_with_headers<S: ToSubject + Send>(
1920
&self,
20-
subject: String,
21+
subject: S,
2122
headers: HeaderMap,
2223
payload: Bytes,
2324
) -> impl Future<Output = Result<Message, Self::RequestError>> + Send;
@@ -26,9 +27,9 @@ pub trait RequestClient: Send + Sync + Clone + 'static {
2627
pub trait PublishClient: Send + Sync + Clone + 'static {
2728
type PublishError: Error + Send + Sync;
2829

29-
fn publish_with_headers(
30+
fn publish_with_headers<S: ToSubject + Send>(
3031
&self,
31-
subject: String,
32+
subject: S,
3233
headers: HeaderMap,
3334
payload: Bytes,
3435
) -> impl Future<Output = Result<(), Self::PublishError>> + Send;
@@ -43,17 +44,20 @@ pub trait FlushClient: Send + Sync + Clone + 'static {
4344
impl SubscribeClient for NatsAsyncClient {
4445
type SubscribeError = async_nats::client::SubscribeError;
4546

46-
async fn subscribe(&self, subject: String) -> Result<Subscriber, Self::SubscribeError> {
47+
async fn subscribe<S: ToSubject + Send>(
48+
&self,
49+
subject: S,
50+
) -> Result<Subscriber, Self::SubscribeError> {
4751
self.subscribe(subject).await
4852
}
4953
}
5054

5155
impl RequestClient for NatsAsyncClient {
5256
type RequestError = async_nats::client::RequestError;
5357

54-
async fn request_with_headers(
58+
async fn request_with_headers<S: ToSubject + Send>(
5559
&self,
56-
subject: String,
60+
subject: S,
5761
headers: HeaderMap,
5862
payload: Bytes,
5963
) -> Result<Message, Self::RequestError> {
@@ -64,9 +68,9 @@ impl RequestClient for NatsAsyncClient {
6468
impl PublishClient for NatsAsyncClient {
6569
type PublishError = async_nats::client::PublishError;
6670

67-
async fn publish_with_headers(
71+
async fn publish_with_headers<S: ToSubject + Send>(
6872
&self,
69-
subject: String,
73+
subject: S,
7074
headers: HeaderMap,
7175
payload: Bytes,
7276
) -> Result<(), Self::PublishError> {

rsworkspace/crates/trogon-nats/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub mod messaging;
4343
#[cfg(feature = "test-support")]
4444
pub mod mocks;
4545

46+
pub use async_nats::subject::ToSubject;
4647
pub use auth::{NatsAuth, NatsConfig};
4748
pub use client::{FlushClient, PublishClient, RequestClient, SubscribeClient};
4849
pub use connect::{ConnectError, connect};

rsworkspace/crates/trogon-nats/src/mocks.rs

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::client::{FlushClient, PublishClient, RequestClient, SubscribeClient};
2+
use async_nats::subject::ToSubject;
23
use std::sync::{Arc, Mutex};
34

45
#[derive(Debug, Clone)]
@@ -118,18 +119,24 @@ impl Default for AdvancedMockNatsClient {
118119
impl SubscribeClient for MockNatsClient {
119120
type SubscribeError = MockError;
120121

121-
async fn subscribe(&self, subject: String) -> Result<async_nats::Subscriber, MockError> {
122-
self.subscribed_subjects.lock().unwrap().push(subject);
122+
async fn subscribe<S: ToSubject + Send>(
123+
&self,
124+
subject: S,
125+
) -> Result<async_nats::Subscriber, MockError> {
126+
self.subscribed_subjects
127+
.lock()
128+
.unwrap()
129+
.push(subject.to_subject().to_string());
123130
Err(MockError("mock: subscribe not implemented".to_string()))
124131
}
125132
}
126133

127134
impl RequestClient for MockNatsClient {
128135
type RequestError = MockError;
129136

130-
async fn request_with_headers(
137+
async fn request_with_headers<S: ToSubject + Send>(
131138
&self,
132-
_subject: String,
139+
_subject: S,
133140
_headers: async_nats::HeaderMap,
134141
_payload: bytes::Bytes,
135142
) -> Result<async_nats::Message, MockError> {
@@ -140,16 +147,16 @@ impl RequestClient for MockNatsClient {
140147
impl PublishClient for MockNatsClient {
141148
type PublishError = MockError;
142149

143-
async fn publish_with_headers(
150+
async fn publish_with_headers<S: ToSubject + Send>(
144151
&self,
145-
subject: String,
152+
subject: S,
146153
_headers: async_nats::HeaderMap,
147154
payload: bytes::Bytes,
148155
) -> Result<(), MockError> {
149-
self.published
150-
.lock()
151-
.unwrap()
152-
.push(PublishedMessage { subject, payload });
156+
self.published.lock().unwrap().push(PublishedMessage {
157+
subject: subject.to_subject().to_string(),
158+
payload,
159+
});
153160
Ok(())
154161
}
155162
}
@@ -165,20 +172,24 @@ impl FlushClient for MockNatsClient {
165172
impl SubscribeClient for AdvancedMockNatsClient {
166173
type SubscribeError = MockError;
167174

168-
async fn subscribe(&self, subject: String) -> Result<async_nats::Subscriber, MockError> {
175+
async fn subscribe<S: ToSubject + Send>(
176+
&self,
177+
subject: S,
178+
) -> Result<async_nats::Subscriber, MockError> {
169179
self.base.subscribe(subject).await
170180
}
171181
}
172182

173183
impl RequestClient for AdvancedMockNatsClient {
174184
type RequestError = MockError;
175185

176-
async fn request_with_headers(
186+
async fn request_with_headers<S: ToSubject + Send>(
177187
&self,
178-
subject: String,
188+
subject: S,
179189
_headers: async_nats::HeaderMap,
180190
_payload: bytes::Bytes,
181191
) -> Result<async_nats::Message, MockError> {
192+
let subject = subject.to_subject().to_string();
182193
let should_fail = *self.should_fail_request.lock().unwrap();
183194
if should_fail {
184195
*self.should_fail_request.lock().unwrap() = false;
@@ -207,9 +218,9 @@ impl RequestClient for AdvancedMockNatsClient {
207218
impl PublishClient for AdvancedMockNatsClient {
208219
type PublishError = MockError;
209220

210-
async fn publish_with_headers(
221+
async fn publish_with_headers<S: ToSubject + Send>(
211222
&self,
212-
subject: String,
223+
subject: S,
213224
headers: async_nats::HeaderMap,
214225
payload: bytes::Bytes,
215226
) -> Result<(), MockError> {
@@ -242,15 +253,19 @@ mod tests {
242253
async fn mock_client_tracks_publish() {
243254
let mock = MockNatsClient::new();
244255
let _ = mock
245-
.publish_with_headers("foo".into(), async_nats::HeaderMap::new(), "bar".into())
256+
.publish_with_headers(
257+
"foo",
258+
async_nats::HeaderMap::new(),
259+
bytes::Bytes::from("bar"),
260+
)
246261
.await;
247262
assert_eq!(mock.published_messages(), vec!["foo"]);
248263
}
249264

250265
#[tokio::test]
251266
async fn mock_client_tracks_subscribe() {
252267
let mock = MockNatsClient::new();
253-
let _ = mock.subscribe("test.sub".into()).await;
268+
let _ = mock.subscribe("test.sub").await;
254269
assert_eq!(mock.subscribed_to(), vec!["test.sub"]);
255270
}
256271

@@ -273,7 +288,11 @@ mod tests {
273288
async fn advanced_mock_request_no_response_configured() {
274289
let mock = AdvancedMockNatsClient::new();
275290
let result = mock
276-
.request_with_headers("missing".into(), async_nats::HeaderMap::new(), "x".into())
291+
.request_with_headers(
292+
"missing",
293+
async_nats::HeaderMap::new(),
294+
bytes::Bytes::from("x"),
295+
)
277296
.await;
278297
assert!(result.is_err());
279298
assert!(result.unwrap_err().0.contains("no response configured"));

0 commit comments

Comments
 (0)