Skip to content

Commit 8457209

Browse files
committed
feat: nats context added
1 parent ed65810 commit 8457209

File tree

4 files changed

+61
-41
lines changed

4 files changed

+61
-41
lines changed

crates/transport-nats/src/lib.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,21 @@ impl AsyncWrite for ParamWriter {
10121012
}
10131013
}
10141014

1015+
#[derive(Debug)]
1016+
pub struct NatsContext {
1017+
pub headers: Option<HeaderMap>,
1018+
pub subject: Subject,
1019+
}
1020+
1021+
impl Default for NatsContext {
1022+
fn default() -> Self {
1023+
Self {
1024+
headers: None,
1025+
subject: Subject::from(""),
1026+
}
1027+
}
1028+
}
1029+
10151030
impl wrpc_transport::Invoke for Client {
10161031
type Context = Option<HeaderMap>;
10171032
type Outgoing = ParamWriter;
@@ -1140,11 +1155,12 @@ async fn handle_message(
11401155
reply: tx,
11411156
payload,
11421157
headers,
1158+
subject,
11431159
..
11441160
}: async_nats::Message,
11451161
paths: &[Box<[Option<usize>]>],
11461162
tasks: Arc<JoinSet<()>>,
1147-
) -> anyhow::Result<(Option<HeaderMap>, SubjectWriter, Reader)> {
1163+
) -> anyhow::Result<(NatsContext, SubjectWriter, Reader)> {
11481164
let tx = tx.context("peer did not specify a reply subject")?;
11491165

11501166
let mut cmds = Vec::with_capacity(paths.len().saturating_add(1));
@@ -1180,7 +1196,7 @@ async fn handle_message(
11801196
.await
11811197
.context("failed to publish handshake accept")?;
11821198
Ok((
1183-
headers,
1199+
NatsContext { headers, subject },
11841200
SubjectWriter::new(
11851201
nats.clone(),
11861202
Subject::from(result_subject(&tx)),
@@ -1201,7 +1217,7 @@ async fn handle_message(
12011217
}
12021218

12031219
impl wrpc_transport::Serve for Client {
1204-
type Context = Option<HeaderMap>;
1220+
type Context = NatsContext;
12051221
type Outgoing = SubjectWriter;
12061222
type Incoming = Reader;
12071223

examples/rust/hello-nats-server/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ struct Args {
3232
#[derive(Clone, Copy)]
3333
struct Server;
3434

35-
impl bindings::exports::wrpc_examples::hello::handler::Handler<Option<async_nats::HeaderMap>>
35+
impl bindings::exports::wrpc_examples::hello::handler::Handler<wrpc_transport_nats::NatsContext>
3636
for Server
3737
{
38-
async fn hello(&self, _: Option<async_nats::HeaderMap>) -> anyhow::Result<String> {
38+
async fn hello(&self, _: wrpc_transport_nats::NatsContext) -> anyhow::Result<String> {
3939
Ok("hello from Rust".to_string())
4040
}
4141
}

examples/rust/streams-nats-server/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ struct Args {
3535
#[derive(Clone, Copy)]
3636
struct Server;
3737

38-
impl bindings::exports::wrpc_examples::streams::handler::Handler<Option<async_nats::HeaderMap>>
38+
impl bindings::exports::wrpc_examples::streams::handler::Handler<wrpc_transport_nats::NatsContext>
3939
for Server
4040
{
4141
async fn echo(
4242
&self,
43-
_cx: Option<async_nats::HeaderMap>,
43+
_cx: wrpc_transport_nats::NatsContext,
4444
Req { numbers, bytes }: Req,
4545
) -> anyhow::Result<(
4646
Pin<Box<dyn Stream<Item = Vec<u64>> + Send>>,

tests/rust.rs

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ use wrpc_transport::frame::{AcceptExt as _, Oneshot};
2222
use wrpc_transport::{Accept, InvokeExt as _, ResourceBorrow, ResourceOwn, ServeExt as _};
2323

2424
#[instrument(skip_all, ret)]
25-
async fn assert_bindgen_async<C, I, S>(clt: Arc<I>, srv: Arc<S>) -> anyhow::Result<()>
25+
async fn assert_bindgen_async<IC, SC, I, S>(clt: Arc<I>, srv: Arc<S>) -> anyhow::Result<()>
2626
where
27-
C: Send + Sync + Default,
28-
I: wrpc::Invoke<Context = C> + 'static,
29-
S: wrpc::Serve<Context = C> + Send + 'static,
27+
IC: Send + Sync + Default,
28+
SC: Send + Sync + Default,
29+
I: wrpc::Invoke<Context = IC> + 'static,
30+
S: wrpc::Serve<Context = SC> + Send + 'static,
3031
{
3132
let span = Span::current();
3233
let (shutdown_tx, shutdown_rx) = oneshot::channel();
@@ -190,11 +191,12 @@ where
190191
}
191192

192193
#[instrument(skip_all, ret)]
193-
async fn assert_bindgen_sync<C, I, S>(clt: Arc<I>, srv: Arc<S>) -> anyhow::Result<()>
194+
async fn assert_bindgen_sync<IC, SC, I, S>(clt: Arc<I>, srv: Arc<S>) -> anyhow::Result<()>
194195
where
195-
C: Send + Sync + Default,
196-
I: wrpc::Invoke<Context = C> + 'static,
197-
S: wrpc::Serve<Context = C> + Send + 'static,
196+
IC: Send + Sync + Default,
197+
SC: Send + Sync + Default,
198+
I: wrpc::Invoke<Context = IC> + 'static,
199+
S: wrpc::Serve<Context = SC> + Send + 'static,
198200
{
199201
let span = Span::current();
200202
let (shutdown_tx, shutdown_rx) = oneshot::channel();
@@ -494,33 +496,34 @@ where
494496
// TODO: Remove the need for this
495497
sleep(Duration::from_secs(1)).await;
496498

497-
impl<C, T> exports::bar::Handler<C> for Component<T>
499+
impl<IC, SC, T> exports::bar::Handler<SC> for Component<T>
498500
where
499-
C: Send + Sync + Default,
500-
T: wrpc::Invoke<Context = C>,
501+
IC: Send + Sync + Default,
502+
SC: Send + Sync + Default,
503+
T: wrpc::Invoke<Context = IC>,
501504
{
502-
async fn bar(&self, _cx: C) -> anyhow::Result<String> {
505+
async fn bar(&self, _cx: SC) -> anyhow::Result<String> {
503506
use shared::Abc;
504507

505508
info!("calling `wrpc-test:integration/test.foo.f`");
506-
foo::foo(self.0.as_ref(), C::default(), "foo")
509+
foo::foo(self.0.as_ref(), IC::default(), "foo")
507510
.await
508511
.context("failed to call `wrpc-test:integration/test.foo.foo`")?;
509512

510513
info!("calling `wrpc-test:integration/test.f`");
511-
let v = f(self.0.as_ref(), C::default(), "foo")
514+
let v = f(self.0.as_ref(), IC::default(), "foo")
512515
.await
513516
.context("failed to call `wrpc-test:integration/test.f`")?;
514517
assert_eq!(v, 42);
515518

516519
info!("calling `wrpc-test:integration/shared.fallible`");
517-
let v = shared::fallible(self.0.as_ref(), C::default())
520+
let v = shared::fallible(self.0.as_ref(), IC::default())
518521
.await
519522
.context("failed to call `wrpc-test:integration/shared.fallible`")?;
520523
assert_eq!(v, Ok(true));
521524

522525
info!("calling `wrpc-test:integration/shared.numbers`");
523-
let v = shared::numbers(self.0.as_ref(), C::default())
526+
let v = shared::numbers(self.0.as_ref(), IC::default())
524527
.await
525528
.context("failed to call `wrpc-test:integration/shared.numbers`")?;
526529
assert_eq!(
@@ -541,14 +544,14 @@ where
541544

542545
info!("calling `wrpc-test:integration/shared.with-flags`");
543546
let v =
544-
shared::with_flags(self.0.as_ref(), C::default())
547+
shared::with_flags(self.0.as_ref(), IC::default())
545548
.await
546549
.context("failed to call `wrpc-test:integration/shared.with-flags`")?;
547550
assert_eq!(v, Abc::A | Abc::C);
548551

549552
let counter = Counter::new(
550553
self.0.as_ref(),
551-
C::default(),
554+
IC::default(),
552555
0,
553556
)
554557
.await
@@ -557,13 +560,13 @@ where
557560
)?;
558561
let counter_borrow = counter.as_borrow();
559562

560-
Counter::increment_by(self.0.as_ref(), C::default(), &counter_borrow, 1)
563+
Counter::increment_by(self.0.as_ref(), IC::default(), &counter_borrow, 1)
561564
.await
562565
.context("failed to call `wrpc-test:integration/shared.[method]counter-increment-by`")?;
563566

564567
let count = Counter::get_count(
565568
self.0.as_ref(),
566-
C::default(),
569+
IC::default(),
567570
&counter_borrow,
568571
)
569572
.await
@@ -572,13 +575,13 @@ where
572575
)?;
573576
assert_eq!(count, 1);
574577

575-
Counter::increment_by(self.0.as_ref(), C::default(), &counter_borrow, 2)
578+
Counter::increment_by(self.0.as_ref(), IC::default(), &counter_borrow, 2)
576579
.await
577580
.context("failed to call `wrpc-test:integration/shared.[method]counter-increment-by`")?;
578581

579582
let count = Counter::get_count(
580583
self.0.as_ref(),
581-
C::default(),
584+
IC::default(),
582585
&counter_borrow,
583586
)
584587
.await
@@ -587,14 +590,14 @@ where
587590
)?;
588591
assert_eq!(count, 3);
589592

590-
let second_counter = Counter::clone_counter(self.0.as_ref(), C::default(), &counter_borrow)
593+
let second_counter = Counter::clone_counter(self.0.as_ref(), IC::default(), &counter_borrow)
591594
.await
592595
.context("failed to call `wrpc-test:integration/shared.[method]counter-clone-counter`")?;
593596

594597
let second_counter_borrow = second_counter.as_borrow();
595598
let sum = Counter::sum(
596599
self.0.as_ref(),
597-
C::default(),
600+
IC::default(),
598601
&counter_borrow,
599602
&second_counter_borrow,
600603
)
@@ -651,7 +654,7 @@ where
651654
// TODO: Remove the need for this
652655
sleep(Duration::from_secs(2)).await;
653656

654-
let v = bar::bar(clt.as_ref(), C::default())
657+
let v = bar::bar(clt.as_ref(), IC::default())
655658
.await
656659
.context("failed to call `wrpc-test:integration/test.bar.bar`")?;
657660
assert_eq!(v, "bar");
@@ -663,11 +666,12 @@ where
663666
}
664667

665668
#[instrument(skip_all, ret)]
666-
async fn assert_dynamic<C, I, S>(clt: Arc<I>, srv: Arc<S>) -> anyhow::Result<()>
669+
async fn assert_dynamic<IC, SC, I, S>(clt: Arc<I>, srv: Arc<S>) -> anyhow::Result<()>
667670
where
668-
C: Send + Sync + Default + 'static,
669-
I: wrpc::Invoke<Context = C>,
670-
S: wrpc::Serve<Context = C>,
671+
IC: Send + Sync + Default + 'static,
672+
SC: Send + Sync + Default + 'static,
673+
I: wrpc::Invoke<Context = IC>,
674+
S: wrpc::Serve<Context = SC>,
671675
{
672676
use core::pin::pin;
673677

@@ -725,7 +729,7 @@ where
725729
async {
726730
info!("invoking `test.reset`");
727731
clt.invoke_values_blocking::<_, _, (String,)>(
728-
C::default(),
732+
IC::default(),
729733
"test",
730734
"reset",
731735
("arg",),
@@ -735,7 +739,7 @@ where
735739
.expect_err("`test.reset` should have failed");
736740
info!("invoking `test.reset`");
737741
clt.invoke_values_blocking::<_, _, (String,)>(
738-
C::default(),
742+
IC::default(),
739743
"test",
740744
"reset",
741745
("arg",),
@@ -745,7 +749,7 @@ where
745749
.expect_err("`test.reset` should have failed");
746750
info!("invoking `test.reset`");
747751
clt.invoke_values_blocking::<_, _, (String,)>(
748-
C::default(),
752+
IC::default(),
749753
"test",
750754
"reset",
751755
("arg",),
@@ -824,7 +828,7 @@ where
824828
info!("invoking `test.sync`");
825829
let returns = clt
826830
.invoke_values_blocking(
827-
C::default(),
831+
IC::default(),
828832
"test",
829833
"sync",
830834
(
@@ -966,7 +970,7 @@ where
966970
info!("invoking `test.async`");
967971
let (returns, io) = clt
968972
.invoke_values(
969-
C::default(),
973+
IC::default(),
970974
"test",
971975
"async",
972976
(a, b, c, d, e),

0 commit comments

Comments
 (0)