1010import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .equalTo ;
1111import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_DESTINATION_TEMPORARY ;
1212import static java .util .Arrays .asList ;
13+ import static org .assertj .core .api .Assertions .assertThat ;
1314
1415import io .nats .client .Dispatcher ;
16+ import io .nats .client .Message ;
1517import io .nats .client .Subscription ;
1618import io .nats .client .impl .Headers ;
1719import io .nats .client .impl .NatsMessage ;
2022import java .time .Duration ;
2123import java .util .ArrayList ;
2224import java .util .List ;
25+ import java .util .Objects ;
2326import java .util .concurrent .CancellationException ;
27+ import java .util .concurrent .CompletableFuture ;
2428import java .util .function .Consumer ;
2529import org .junit .jupiter .api .AfterEach ;
2630import org .junit .jupiter .api .BeforeEach ;
@@ -46,12 +50,14 @@ void afterEach() throws InterruptedException {
4650 @ Test
4751 void testRequestTimeout () throws InterruptedException {
4852 // when
49- testing ()
50- .runWithSpan (
51- "parent" , () -> connection .request ("sub" , new byte [] {0 }, Duration .ofSeconds (1 )));
53+ Message message =
54+ testing ()
55+ .runWithSpan (
56+ "parent" , () -> connection .request ("sub" , new byte [] {0 }, Duration .ofSeconds (1 )));
5257
5358 // then
5459 // assertTimeoutPublishSpan();
60+ assertThat (message ).isNull ();
5561 assertTraceparentHeader (subscription );
5662 }
5763
@@ -64,12 +70,14 @@ void testRequestBody() throws InterruptedException {
6470 .subscribe ("sub" );
6571
6672 // when
67- testing ()
68- .runWithSpan (
69- "parent" , () -> connection .request ("sub" , new byte [] {0 }, Duration .ofSeconds (1 )));
73+ Message message =
74+ testing ()
75+ .runWithSpan (
76+ "parent" , () -> connection .request ("sub" , new byte [] {0 }, Duration .ofSeconds (1 )));
7077 connection .closeDispatcher (dispatcher );
7178
7279 // then
80+ assertThat (message ).isNotNull ();
7381 assertPublishReceiveSpansSameTrace ();
7482 assertTraceparentHeader (subscription );
7583 }
@@ -83,13 +91,17 @@ void testRequestHeadersBody() throws InterruptedException {
8391 .subscribe ("sub" );
8492
8593 // when
86- testing ()
87- .runWithSpan (
88- "parent" ,
89- () -> connection .request ("sub" , new Headers (), new byte [] {0 }, Duration .ofSeconds (1 )));
94+ Message message =
95+ testing ()
96+ .runWithSpan (
97+ "parent" ,
98+ () ->
99+ connection .request (
100+ "sub" , new Headers (), new byte [] {0 }, Duration .ofSeconds (1 )));
90101 connection .closeDispatcher (dispatcher );
91102
92103 // then
104+ assertThat (message ).isNotNull ();
93105 assertPublishReceiveSpansSameTrace ();
94106 assertTraceparentHeader (subscription );
95107 }
@@ -104,10 +116,12 @@ void testRequestMessage() throws InterruptedException {
104116 NatsMessage message = NatsMessage .builder ().subject ("sub" ).data ("x" ).build ();
105117
106118 // when
107- testing ().runWithSpan ("parent" , () -> connection .request (message , Duration .ofSeconds (1 )));
119+ Message response =
120+ testing ().runWithSpan ("parent" , () -> connection .request (message , Duration .ofSeconds (1 )));
108121 connection .closeDispatcher (dispatcher );
109122
110123 // then
124+ assertThat (response ).isNotNull ();
111125 assertPublishReceiveSpansSameTrace ();
112126 assertTraceparentHeader (subscription );
113127 }
@@ -123,10 +137,12 @@ void testRequestMessageHeaders() throws InterruptedException {
123137 NatsMessage .builder ().subject ("sub" ).headers (new Headers ()).data ("x" ).build ();
124138
125139 // when
126- testing ().runWithSpan ("parent" , () -> connection .request (message , Duration .ofSeconds (1 )));
140+ Message response =
141+ testing ().runWithSpan ("parent" , () -> connection .request (message , Duration .ofSeconds (1 )));
127142 connection .closeDispatcher (dispatcher );
128143
129144 // then
145+ assertThat (response ).isNotNull ();
130146 assertPublishReceiveSpansSameTrace ();
131147 assertTraceparentHeader (subscription );
132148 }
@@ -140,13 +156,15 @@ void testRequestFutureBody() throws InterruptedException {
140156 .subscribe ("sub" );
141157
142158 // when
143- testing ()
144- .runWithSpan ("parent" , () -> connection .request ("sub" , new byte [] {0 }))
145- .whenComplete ((m , e ) -> connection .closeDispatcher (dispatcher ));
159+ CompletableFuture <Message > message =
160+ testing ()
161+ .runWithSpan ("parent" , () -> connection .request ("sub" , new byte [] {0 }))
162+ .whenComplete ((m , e ) -> connection .closeDispatcher (dispatcher ));
146163
147164 // then
148165 assertPublishReceiveSpansSameTrace ();
149166 assertTraceparentHeader (subscription );
167+ assertThat (message ).isCompletedWithValueMatching (Objects ::nonNull );
150168 }
151169
152170 @ Test
@@ -158,13 +176,15 @@ void testRequestFutureHeadersBody() throws InterruptedException {
158176 .subscribe ("sub" );
159177
160178 // when
161- testing ()
162- .runWithSpan ("parent" , () -> connection .request ("sub" , new Headers (), new byte [] {0 }))
163- .whenComplete ((m , e ) -> connection .closeDispatcher (dispatcher ));
179+ CompletableFuture <Message > message =
180+ testing ()
181+ .runWithSpan ("parent" , () -> connection .request ("sub" , new Headers (), new byte [] {0 }))
182+ .whenComplete ((m , e ) -> connection .closeDispatcher (dispatcher ));
164183
165184 // then
166185 assertPublishReceiveSpansSameTrace ();
167186 assertTraceparentHeader (subscription );
187+ assertThat (message ).isCompletedWithValueMatching (Objects ::nonNull );
168188 }
169189
170190 @ Test
@@ -177,13 +197,15 @@ void testRequestFutureMessage() throws InterruptedException {
177197 NatsMessage message = NatsMessage .builder ().subject ("sub" ).data ("x" ).build ();
178198
179199 // when
180- testing ()
181- .runWithSpan ("parent" , () -> connection .request (message ))
182- .whenComplete ((m , e ) -> connection .closeDispatcher (dispatcher ));
200+ CompletableFuture <Message > response =
201+ testing ()
202+ .runWithSpan ("parent" , () -> connection .request (message ))
203+ .whenComplete ((m , e ) -> connection .closeDispatcher (dispatcher ));
183204
184205 // then
185206 assertPublishReceiveSpansSameTrace ();
186207 assertTraceparentHeader (subscription );
208+ assertThat (response ).isCompletedWithValueMatching (Objects ::nonNull );
187209 }
188210
189211 @ Test
@@ -197,41 +219,47 @@ void testRequestFutureMessageHeaders() throws InterruptedException {
197219 NatsMessage .builder ().subject ("sub" ).headers (new Headers ()).data ("x" ).build ();
198220
199221 // when
200- testing ()
201- .runWithSpan ("parent" , () -> connection .request (message ))
202- .whenComplete ((m , e ) -> connection .closeDispatcher (dispatcher ));
222+ CompletableFuture <Message > response =
223+ testing ()
224+ .runWithSpan ("parent" , () -> connection .request (message ))
225+ .whenComplete ((m , e ) -> connection .closeDispatcher (dispatcher ));
203226
204227 // then
205228 assertPublishReceiveSpansSameTrace ();
206229 assertTraceparentHeader (subscription );
230+ assertThat (response ).isCompletedWithValueMatching (Objects ::nonNull );
207231 }
208232
209233 @ Test
210234 void testRequestTimeoutFutureBody () throws InterruptedException {
211235 // when
212- testing ()
213- .runWithSpan (
214- "parent" ,
215- () -> connection .requestWithTimeout ("sub" , new byte [] {0 }, Duration .ofSeconds (1 )));
236+ CompletableFuture <Message > message =
237+ testing ()
238+ .runWithSpan (
239+ "parent" ,
240+ () -> connection .requestWithTimeout ("sub" , new byte [] {0 }, Duration .ofSeconds (1 )));
216241
217242 // then
218243 assertCancellationPublishSpan ();
219244 assertTraceparentHeader (subscription );
245+ assertThat (message ).isCompletedExceptionally ();
220246 }
221247
222248 @ Test
223249 void testRequestTimeoutFutureHeadersBody () throws InterruptedException {
224250 // when
225- testing ()
226- .runWithSpan (
227- "parent" ,
228- () ->
229- connection .requestWithTimeout (
230- "sub" , new Headers (), new byte [] {0 }, Duration .ofSeconds (1 )));
251+ CompletableFuture <Message > message =
252+ testing ()
253+ .runWithSpan (
254+ "parent" ,
255+ () ->
256+ connection .requestWithTimeout (
257+ "sub" , new Headers (), new byte [] {0 }, Duration .ofSeconds (1 )));
231258
232259 // then
233260 assertCancellationPublishSpan ();
234261 assertTraceparentHeader (subscription );
262+ assertThat (message ).isCompletedExceptionally ();
235263 }
236264
237265 @ Test
@@ -240,12 +268,15 @@ void testRequestTimeoutFutureMessage() throws InterruptedException {
240268 NatsMessage message = NatsMessage .builder ().subject ("sub" ).data ("x" ).build ();
241269
242270 // when
243- testing ()
244- .runWithSpan ("parent" , () -> connection .requestWithTimeout (message , Duration .ofSeconds (1 )));
271+ CompletableFuture <Message > response =
272+ testing ()
273+ .runWithSpan (
274+ "parent" , () -> connection .requestWithTimeout (message , Duration .ofSeconds (1 )));
245275
246276 // then
247277 assertCancellationPublishSpan ();
248278 assertTraceparentHeader (subscription );
279+ assertThat (response ).isCompletedExceptionally ();
249280 }
250281
251282 @ Test
@@ -255,12 +286,15 @@ void testRequestTimeoutFutureMessageHeaders() throws InterruptedException {
255286 NatsMessage .builder ().subject ("sub" ).headers (new Headers ()).data ("x" ).build ();
256287
257288 // when
258- testing ()
259- .runWithSpan ("parent" , () -> connection .requestWithTimeout (message , Duration .ofSeconds (1 )));
289+ CompletableFuture <Message > response =
290+ testing ()
291+ .runWithSpan (
292+ "parent" , () -> connection .requestWithTimeout (message , Duration .ofSeconds (1 )));
260293
261294 // then
262295 assertCancellationPublishSpan ();
263296 assertTraceparentHeader (subscription );
297+ assertThat (response ).isCompletedExceptionally ();
264298 }
265299
266300 private void assertPublishReceiveSpansSameTrace () {
0 commit comments