3
3
4
4
package com .azure .messaging .servicebus ;
5
5
6
+ import com .azure .core .util .logging .ClientLogger ;
6
7
import org .junit .jupiter .api .Assertions ;
7
8
import org .junit .jupiter .api .Test ;
8
9
import org .junit .jupiter .api .parallel .Execution ;
19
20
import java .util .Deque ;
20
21
import java .util .List ;
21
22
import java .util .concurrent .ConcurrentLinkedDeque ;
23
+ import java .util .concurrent .atomic .AtomicBoolean ;
22
24
import java .util .concurrent .atomic .AtomicReference ;
23
25
import java .util .function .Consumer ;
24
26
import java .util .function .Supplier ;
33
35
@ Execution (ExecutionMode .SAME_THREAD )
34
36
@ Isolated
35
37
public final class WindowedSubscriberFluxWindowIsolatedTest {
38
+ private final ClientLogger logger = new ClientLogger (WindowedSubscriberFluxWindowIsolatedTest .class );
39
+
36
40
@ Test
37
41
@ Execution (ExecutionMode .SAME_THREAD )
38
42
public void shouldCloseEmptyWindowOnTimeout () {
@@ -44,13 +48,14 @@ public void shouldCloseEmptyWindowOnTimeout() {
44
48
upstream .subscribe (subscriber );
45
49
46
50
final AtomicReference <EnqueueResult <Integer >> rRef = new AtomicReference <>();
47
- final Supplier <Publisher <Integer >> scenario = () -> {
48
- final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
49
- rRef .set (r );
50
- return r .getWindowFlux ();
51
- };
52
-
53
51
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
52
+ final Supplier <Publisher <Integer >> scenario = () -> {
53
+ verifier .logIfClosedUnexpectedly (logger );
54
+ final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
55
+ rRef .set (r );
56
+ return r .getWindowFlux ();
57
+ };
58
+
54
59
verifier .create (scenario )
55
60
// Forward time to timeout empty windowTimeout.
56
61
.thenAwait (windowTimeout .plusSeconds (10 ))
@@ -74,13 +79,14 @@ public void shouldCloseStreamingWindowOnTimeout() {
74
79
upstream .subscribe (subscriber );
75
80
76
81
final AtomicReference <EnqueueResult <Integer >> rRef = new AtomicReference <>();
77
- final Supplier <Publisher <Integer >> scenario = () -> {
78
- final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
79
- rRef .set (r );
80
- return r .getWindowFlux ();
81
- };
82
-
83
82
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
83
+ final Supplier <Publisher <Integer >> scenario = () -> {
84
+ verifier .logIfClosedUnexpectedly (logger );
85
+ final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
86
+ rRef .set (r );
87
+ return r .getWindowFlux ();
88
+ };
89
+
84
90
verifier .create (scenario )
85
91
.then (() -> upstream .next (1 ))
86
92
.then (() -> upstream .next (2 ))
@@ -108,17 +114,18 @@ public void shouldContinueToNextWindowWhenEmptyWindowTimeout() {
108
114
109
115
final AtomicReference <EnqueueResult <Integer >> r0Ref = new AtomicReference <>();
110
116
final AtomicReference <EnqueueResult <Integer >> r1Ref = new AtomicReference <>();
111
- final Supplier <Publisher <Integer >> scenario = () -> {
112
- final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
113
- final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
114
- r0Ref .set (r0 );
115
- r1Ref .set (r1 );
116
- final Flux <Integer > window0Flux = r0 .getWindowFlux ();
117
- final Flux <Integer > window1Flux = r1 .getWindowFlux ();
118
- return window0Flux .concatWith (window1Flux );
119
- };
120
-
121
117
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
118
+ final Supplier <Publisher <Integer >> scenario = () -> {
119
+ verifier .logIfClosedUnexpectedly (logger );
120
+ final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
121
+ final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
122
+ r0Ref .set (r0 );
123
+ r1Ref .set (r1 );
124
+ final Flux <Integer > window0Flux = r0 .getWindowFlux ();
125
+ final Flux <Integer > window1Flux = r1 .getWindowFlux ();
126
+ return window0Flux .concatWith (window1Flux );
127
+ };
128
+
122
129
verifier .create (scenario )
123
130
// Forward time to timeout empty window0Flux,
124
131
.thenAwait (windowTimeout .plusSeconds (10 ))
@@ -154,17 +161,18 @@ public void shouldContinueToNextWindowWhenStreamingWindowTimeout() {
154
161
155
162
final AtomicReference <EnqueueResult <Integer >> r0Ref = new AtomicReference <>();
156
163
final AtomicReference <EnqueueResult <Integer >> r1Ref = new AtomicReference <>();
157
- final Supplier <Publisher <Integer >> scenario = () -> {
158
- final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
159
- final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
160
- r0Ref .set (r0 );
161
- r1Ref .set (r1 );
162
- final Flux <Integer > window0Flux = r0 .getWindowFlux ();
163
- final Flux <Integer > window1Flux = r1 .getWindowFlux ();
164
- return window0Flux .concatWith (window1Flux );
165
- };
166
-
167
164
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
165
+ final Supplier <Publisher <Integer >> scenario = () -> {
166
+ verifier .logIfClosedUnexpectedly (logger );
167
+ final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
168
+ final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
169
+ r0Ref .set (r0 );
170
+ r1Ref .set (r1 );
171
+ final Flux <Integer > window0Flux = r0 .getWindowFlux ();
172
+ final Flux <Integer > window1Flux = r1 .getWindowFlux ();
173
+ return window0Flux .concatWith (window1Flux );
174
+ };
175
+
168
176
verifier .create (scenario )
169
177
.then (() -> upstream .next (1 ))
170
178
.then (() -> upstream .next (2 ))
@@ -186,9 +194,19 @@ public void shouldContinueToNextWindowWhenStreamingWindowTimeout() {
186
194
Assertions .assertFalse (work0 .isCanceled ());
187
195
188
196
final WindowWork <Integer > work1 = r1Ref .get ().getInnerWork ();
189
- Assertions .assertNotEquals (windowSize , work1 .getPending ());
190
197
Assertions .assertTrue (work1 .hasTimedOut ());
191
198
Assertions .assertFalse (work1 .isCanceled ());
199
+ final boolean hasWindow1ReceivedNothing = work1 .getPending () == windowSize ;
200
+ if (hasWindow1ReceivedNothing ) {
201
+ // The combination of VirtualTimeScheduler and WindowedSubscriber.drain() sometimes delays arrival of timeout
202
+ // signaling for window0, resulting window0 to timeout only after the emission of 3 (and 4). This result in
203
+ // window0 to receive 1, 2, 3 and 4, and window1 to receive nothing. Here asserting that, when/if this happens
204
+ // application still gets emitted events via window0.
205
+ //
206
+ final boolean hasWindow0ReceivedAll = work0 .getPending () == windowSize - 4 ; // (demanded - received)
207
+ Assertions .assertTrue (hasWindow0ReceivedAll ,
208
+ String .format ("window0 pending: %d, window1 pending: %d" , work0 .getPending (), work1 .getPending ()));
209
+ }
192
210
}
193
211
194
212
@ Test
@@ -204,19 +222,20 @@ public void shouldContinueToNextWindowWhenStreamingWindowCancels() {
204
222
205
223
final AtomicReference <EnqueueResult <Integer >> r0Ref = new AtomicReference <>();
206
224
final AtomicReference <EnqueueResult <Integer >> r1Ref = new AtomicReference <>();
207
- final Supplier <Publisher <Integer >> scenario = () -> {
208
- final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
209
- final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
210
- r0Ref .set (r0 );
211
- r1Ref .set (r1 );
212
- final Flux <Integer > window0Flux = r0 .getWindowFlux ();
213
- final Flux <Integer > window1Flux = r1 .getWindowFlux ();
214
- return window0Flux
215
- .take (cancelAfter )
216
- .concatWith (window1Flux );
217
- };
218
-
219
225
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
226
+ final Supplier <Publisher <Integer >> scenario = () -> {
227
+ verifier .logIfClosedUnexpectedly (logger );
228
+ final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
229
+ final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
230
+ r0Ref .set (r0 );
231
+ r1Ref .set (r1 );
232
+ final Flux <Integer > window0Flux = r0 .getWindowFlux ();
233
+ final Flux <Integer > window1Flux = r1 .getWindowFlux ();
234
+ return window0Flux
235
+ .take (cancelAfter )
236
+ .concatWith (window1Flux );
237
+ };
238
+
220
239
verifier .create (scenario )
221
240
.then (() -> upstream .next (1 ))
222
241
.then (() -> upstream .next (2 ))
@@ -248,13 +267,14 @@ public void shouldRequestWindowDemand() {
248
267
upstream .subscribe (subscriber );
249
268
250
269
final AtomicReference <EnqueueResult <Integer >> rRef = new AtomicReference <>();
251
- final Supplier <Publisher <Integer >> scenario = () -> {
252
- final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
253
- rRef .set (r );
254
- return r .getWindowFlux ();
255
- };
256
-
257
270
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
271
+ final Supplier <Publisher <Integer >> scenario = () -> {
272
+ verifier .logIfClosedUnexpectedly (logger );
273
+ final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
274
+ rRef .set (r );
275
+ return r .getWindowFlux ();
276
+ };
277
+
258
278
verifier .create (scenario )
259
279
.thenAwait (windowTimeout .plusSeconds (10 ))
260
280
.verifyComplete ();
@@ -279,18 +299,18 @@ public void shouldAccountPendingRequestWhenServingNextWindowDemand() {
279
299
280
300
final AtomicReference <EnqueueResult <Integer >> r0Ref = new AtomicReference <>();
281
301
final AtomicReference <EnqueueResult <Integer >> r1Ref = new AtomicReference <>();
282
- final Supplier <Publisher <Integer >> scenario = () -> {
283
- final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (window0Size , windowTimeout );
284
- final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (window1Size , windowTimeout );
285
- r0Ref .set (r0 );
286
- r1Ref .set (r1 );
287
- final Flux <Integer > window0Flux = r0 .getWindowFlux ();
288
- final Flux <Integer > window1Flux = r1 .getWindowFlux ();
289
- return window0Flux .concatWith (window1Flux );
290
- };
291
-
292
-
293
302
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
303
+ final Supplier <Publisher <Integer >> scenario = () -> {
304
+ verifier .logIfClosedUnexpectedly (logger );
305
+ final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (window0Size , windowTimeout );
306
+ final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (window1Size , windowTimeout );
307
+ r0Ref .set (r0 );
308
+ r1Ref .set (r1 );
309
+ final Flux <Integer > window0Flux = r0 .getWindowFlux ();
310
+ final Flux <Integer > window1Flux = r1 .getWindowFlux ();
311
+ return window0Flux .concatWith (window1Flux );
312
+ };
313
+
294
314
verifier .create (scenario )
295
315
// timeout window0Flux without receiving (so pending request become 'windowSize0'), and pick next work.
296
316
.thenAwait (windowTimeout .plusSeconds (10 ))
@@ -322,17 +342,19 @@ public void shouldPickEnqueuedWindowRequestsOnSubscriptionReady() {
322
342
323
343
final AtomicReference <EnqueueResult <Integer >> r0Ref = new AtomicReference <>();
324
344
final AtomicReference <EnqueueResult <Integer >> r1Ref = new AtomicReference <>();
325
- final Supplier <Publisher <Integer >> scenario = () -> {
326
- final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (window0Size , windowTimeout );
327
- final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (window1Size , windowTimeout );
328
- r0Ref .set (r0 );
329
- r1Ref .set (r1 );
330
- final Flux <Integer > window0Flux = r0 .getWindowFlux ();
331
- final Flux <Integer > window1Flux = r1 .getWindowFlux ();
332
- return window0Flux .concatWith (window1Flux );
333
- };
334
345
335
346
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
347
+ final Supplier <Publisher <Integer >> scenario = () -> {
348
+ verifier .logIfClosedUnexpectedly (logger );
349
+ final EnqueueResult <Integer > r0 = subscriber .enqueueRequestImpl (window0Size , windowTimeout );
350
+ final EnqueueResult <Integer > r1 = subscriber .enqueueRequestImpl (window1Size , windowTimeout );
351
+ r0Ref .set (r0 );
352
+ r1Ref .set (r1 );
353
+ final Flux <Integer > window0Flux = r0 .getWindowFlux ();
354
+ final Flux <Integer > window1Flux = r1 .getWindowFlux ();
355
+ return window0Flux .concatWith (window1Flux );
356
+ };
357
+
336
358
verifier .create (scenario )
337
359
// subscribe after enqueuing requests in 'scenario' (mimicking late arrival of subscription).
338
360
.then (() -> upstream .subscribe (subscriber ))
@@ -366,14 +388,13 @@ public void shouldInvokeReleaserWhenNoWindowToService() {
366
388
final WindowedSubscriber <Integer > subscriber = createSubscriber (options .setReleaser (releaser ));
367
389
upstream .subscribe (subscriber );
368
390
369
- final AtomicReference <EnqueueResult <Integer >> rRef = new AtomicReference <>();
370
- final Supplier <Publisher <Integer >> scenario = () -> {
371
- final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
372
- rRef .set (r );
373
- return r .getWindowFlux ();
374
- };
375
-
376
391
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
392
+ final Supplier <Publisher <Integer >> scenario = () -> {
393
+ verifier .logIfClosedUnexpectedly (logger );
394
+ final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
395
+ return r .getWindowFlux ();
396
+ };
397
+
377
398
verifier .create (scenario )
378
399
// forward time to timeout windowFlux without receiving.
379
400
.thenAwait (windowTimeout .plusSeconds (10 ))
@@ -400,14 +421,13 @@ public void shouldStopInvokingReleaserOnUpstreamTermination() {
400
421
final WindowedSubscriber <Integer > subscriber = createSubscriber (options .setReleaser (releaser ));
401
422
upstream .subscribe (subscriber );
402
423
403
- final AtomicReference <EnqueueResult <Integer >> rRef = new AtomicReference <>();
404
- final Supplier <Publisher <Integer >> scenario = () -> {
405
- final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
406
- rRef .set (r );
407
- return r .getWindowFlux ();
408
- };
409
-
410
424
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier ()) {
425
+ final Supplier <Publisher <Integer >> scenario = () -> {
426
+ verifier .logIfClosedUnexpectedly (logger );
427
+ final EnqueueResult <Integer > r = subscriber .enqueueRequestImpl (windowSize , windowTimeout );
428
+ return r .getWindowFlux ();
429
+ };
430
+
411
431
verifier .create (scenario )
412
432
// forward time to timeout windowFlux without receiving.
413
433
.thenAwait (windowTimeout .plusSeconds (10 ))
@@ -424,10 +444,11 @@ public void shouldStopInvokingReleaserOnUpstreamTermination() {
424
444
Assertions .assertEquals (Arrays .asList (1 , 2 ), released );
425
445
}
426
446
427
- private static final class VirtualTimeStepVerifier implements AutoCloseable {
447
+ private static final class VirtualTimeStepVerifier extends AtomicBoolean implements AutoCloseable {
428
448
private final VirtualTimeScheduler scheduler ;
429
449
430
450
VirtualTimeStepVerifier () {
451
+ super (false );
431
452
scheduler = VirtualTimeScheduler .create ();
432
453
}
433
454
@@ -437,8 +458,21 @@ <T> StepVerifier.Step<T> create(Supplier<Publisher<T>> scenarioSupplier) {
437
458
438
459
@ Override
439
460
public void close () {
461
+ super .set (true );
440
462
scheduler .dispose ();
441
463
}
464
+
465
+ void logIfClosedUnexpectedly (ClientLogger logger ) {
466
+ final boolean wasAutoClosed = get ();
467
+ final boolean isSchedulerDisposed = scheduler .isDisposed ();
468
+ if (wasAutoClosed || isSchedulerDisposed ) {
469
+ if (!wasAutoClosed ) {
470
+ logger .atError ().log ("VirtualTimeScheduler unavailable (unexpected close from outside of the test)." );
471
+ } else {
472
+ logger .atError ().log ("VirtualTimeScheduler unavailable (unexpected close by the test)." );
473
+ }
474
+ }
475
+ }
442
476
}
443
477
444
478
private static class Releaser <T > implements Consumer <T > {
0 commit comments