Skip to content

Conversation

@AdamChlupacek
Copy link
Contributor

Stream merge does not block until scope is closed, but only until the chunk is read from the output channel.

Merge was waiting for the resulting chunk to be fully processed, this introduced a scope (resource) to the stream, which then if leased for further parallel processing would cause the merge to be waiting forever (until the parallel processing finished).

Here I am changing that we are only guarding the production of values from merge. Instead of guarding the whole processing of the chunk, we are only guarding the chunk up to the point where it is read from the output of the merge.

This also fixes #3598.

This is now aligned with documentation of merge where Stream(this, that).parJoinUnbounded == this merge that

@AdamChlupacek
Copy link
Contributor Author

The failing tests:

  • fs2.StreamMergeSuite.merge not emit ahead
    I think that this test is actually testing the wrong behaviour. It is explicitly stated in documentation of merge that there can be up to 2 chunks pulled (1 from each side), while the downstream data is being consumed.

This test assumes lock step, but we cannot assure this in the first 2 chunks.

If we extend this test we can assure that we do not pul more than 1 chunk any longer. Allowing the first 2 values to be the same and then having the values increment would prove this case.

  • fs2.TimedPullsSuite.After the first uncons, timeouts start immediately
    timed on stream is actually run using merge, in this test we could not actually ever produce a second element ever, since the first chunk received via uncons which acquired the resource in merge and hence could never actually produce a value again. This is the same issue as in Some streams merged with merge stop emitting when downstream does certain actions #3598.

I will apply fixes to these test if we agree that this change to merge is desired.

@mpilquist
Copy link
Member

Looks good to me, good find!

@AdamChlupacek
Copy link
Contributor Author

Thank you for the approval, I have fixed the tests.

However in the test: fs2.StreamMergeSuite.merge not emit ahead cannot be fixed as is. As the expected behaviour of merge i that we will read 1 chunk ahead. As such I changed the test that we will not read more than 1 chunk ahead. This test was introduced as part of fix to #1987, not sure if some people rely on this to happen. Especially since its in conflict with the documentation of merge

Not sure about the failed action, seems unrelated.

@mpilquist
Copy link
Member

Hm, I wonder if we should provide an alternative merge that keeps current behavior then. Not sure what to call it. Something like mergeAndAwaitDownstream

@AdamChlupacek
Copy link
Contributor Author

I agree about inclusion of the mergeAndAwaitDownstream, it can be helpful in some cases. Let me add it.

@mpilquist mpilquist merged commit 179dc20 into typelevel:main Sep 24, 2025
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Some streams merged with merge stop emitting when downstream does certain actions

2 participants