-
Notifications
You must be signed in to change notification settings - Fork 138
feat: add demand pausing operator for Multi #2007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3b0528a to
66cdf5a
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2007 +/- ##
============================================
- Coverage 89.11% 89.06% -0.05%
- Complexity 3068 3101 +33
============================================
Files 409 412 +3
Lines 13099 13256 +157
Branches 1652 1679 +27
============================================
+ Hits 11673 11807 +134
- Misses 808 818 +10
- Partials 618 631 +13
🚀 New features to boost your workflow:
|
66cdf5a to
d12f34b
Compare
jponge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent contribution @ozangunalp 👏
See my comments, plus:
- the
MultiDemandPausingTestrecorded in the Git tree has some minor reformat being applied at compilation time - I don't expect this operator to be any Reactive Streams compliant, so we should document this both in the guide and in the Javadocs.
documentation/src/test/java/guides/operators/PausingDemandTest.java
Outdated
Show resolved
Hide resolved
documentation/src/test/java/guides/operators/PausingDemandTest.java
Outdated
Show resolved
Hide resolved
implementation/src/main/java/io/smallrye/mutiny/subscription/PausableMulti.java
Outdated
Show resolved
Hide resolved
implementation/src/main/java/io/smallrye/mutiny/subscription/PausableMulti.java
Outdated
Show resolved
Hide resolved
implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java
Show resolved
Hide resolved
3d15550 to
6da1b4c
Compare
|
@jponge I am not sure about the Reactive Streams compliance of this operator. Sure, it can drop some items if it is intentional, but it doesn't emit more than what was requested from downstream. |
|
That's interesting. I suppose that it won't pass the TCK when paused, but it should pass the TCK when it is not. We should check this then 😇 |
I pushed a TCK test with pause/resume at every received item. |
6da1b4c to
267e83e
Compare
jponge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's finish the TCK discussion, then the rest of the PR is excellent!
| public Flow.Publisher<Long> createFlowPublisher(long elements) { | ||
| DemandPauser pauser = new DemandPauser(); | ||
| return upstream(elements).pauseDemand().using(pauser) | ||
| .invoke(pauser::pause) | ||
| .invoke(pauser::resume); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is different from passing the TCK to the upstream, since we pause then resume on the same call stack.
I think we should have this, or just have the operator as a pass-through, but if we have a TCK test where we pause for longer periods of time then I anticipate the TCK will complain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this only verifies that a simple pause/resume doesn't break the TCK.
Another test is needed with an async consumption and an operator with prefetch.
I think if the operator is eventually resumed inside the test it should pass the TCK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jponge I pushed a small update to the operator and the TCK test.
It now passes the TCK without a pass-through stream as long as the stream is not left in a paused state.
Specifically, it now handles the completion case better.
I also added a note to the javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, let's re-run the CI :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That NPE is quite something : https://github.com/smallrye/smallrye-mutiny/actions/runs/19634508391/job/56221872235
267e83e to
13ff959
Compare
13ff959 to
e5ff7cf
Compare
jponge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All good!
Let's just not merge now, I might have a fix or two to stage in a patch release over the next 2 weeks.
|
I'm doing another patch release, then this will go to |
Introduces a new
pauseDemand()operator that allows pausing and resuming request propagation in reactive streams.First used in Reactive Messaging pausable channels, this implementation also handles buffering already requested items