Issues with onTermination #1842
-
|
I have this example code: Cancellable cancellable = Uni.createFrom().item(10)
.onItem().transform(item ->
{
System.out.println("start computing");
try
{
item *= 10;
Thread.sleep(1000);
}
catch(InterruptedException e)
{
}
System.out.println("end computing");
return item;
})
.toMulti()
.onCancellation().invoke(() -> System.out.println("onCancellation1"))
.onTermination().invoke(() -> System.out.println("onTermination"))
.onCancellation().invoke(() -> System.out.println("onCancellation2"))
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.onSubscription().invoke(() -> System.out.println("onSubscription"))
.subscribe().with(item -> System.out.println("itemReceived: " + item));
try
{
Thread.sleep(200);
}
catch(InterruptedException e)
{
}
cancellable.cancel();I've expected this output: But what I've got is this: There are two issues with this:
Thanks a lot. |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 9 replies
-
|
The order is correct, each stage in the pipeline is an intermediate operation (and also a The cancellation is eventual, so when an operator / publisher receives a I also think that you are expecting Mutiny to magically create threads, but it does not. When you have a If you want to make more sense of what happens at each stage, you might plug a few |
Beta Was this translation helpful? Give feedback.
-
|
But why is the |
Beta Was this translation helpful? Give feedback.
-
|
Back to my first post: For cleanup purposes, I need a signal which is fired when the stream is terminated completely. Now I know that |
Beta Was this translation helpful? Give feedback.
@alerosmile Back to this: yes if a thread cancels while another thread is busy with processing an item, the 2 events will be concurrent, and the termination callback will be called concurrently. This is by design of reactive streams.
What you could do is have some critical section in both the termination and item processing handler (say, in a
onItem().transform(...)). But ideally whatever logic you have in a termination handler shall assume asynchronicity.