-
|
Hello, I am writing test cases for the Multi stream and trying to get the AssertSubscriber working. Here are my test code: @Test
void multiTest() {
Multi<String> multi = Multi.createFrom().generator(() -> 0, (i, em) -> {
if (i > 5) {
em.complete();
} else {
em.emit(List.of("i=" + i, "i=" + i));
}
return i+1;
}).onItem().<String>disjoint().onItem().transform(i -> i);
AssertSubscriber<String> subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create(10));
subscriber.assertCompleted().assertItems("i=0","i=0","i=1","i=1","i=2","i=2","i=3","i=3");
}but it never reach completion and fails: Wrote another case without the AssertSubscriber, it works as expected: @Test
void multiTestWithoutAssertComplete() {
Multi<String> multi = Multi.createFrom().generator(() -> 0, (i, em) -> {
if (i > 5) {
em.complete();
} else {
em.emit(List.of("i=" + i, "i=" + i));
}
return i+1;
}).onItem().disjoint();
CompletableFuture<Void> syncFlag = new CompletableFuture<>();
List<String> results = new ArrayList<>();
multi.subscribe().with(results::add, () -> syncFlag.complete(null));
syncFlag.join();
assertThat(results, hasItems("i=0","i=0","i=1","i=1","i=2","i=2","i=3","i=3"));
}Am I writing it wrong? Or does it not work with generator() ? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
|
You make a request of (up to) Remember that demand applies to the last operator in the pipeline first, so requesting |
Beta Was this translation helpful? Give feedback.
You make a request of (up to)
10items, but you generate pairs in lists, thendisjointpicks those items to "flatten" the list, so at the generator level you haven't reachedi=5yet. If you change the initial request to20, you will receive a completion.Remember that demand applies to the last operator in the pipeline first, so requesting
10in your example applies to thedisjointoperator, not to the generator.