-
Couldn't load subscription status.
- Fork 621
Fixing interruption behaviour #3183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
b720a5b
b03e1f8
4955b51
b785e49
8182c8e
fdc2d16
d0f0a3a
ff688be
c73ab66
b64eb5a
8d57590
9ae18d7
89889fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,7 +23,7 @@ package fs2 | |||||||||
|
|
||||||||||
| import cats.effect.kernel.Deferred | ||||||||||
| import cats.effect.kernel.Ref | ||||||||||
| import cats.effect.std.{Semaphore, Queue} | ||||||||||
| import cats.effect.std.{Queue, Semaphore} | ||||||||||
| import cats.effect.testkit.TestControl | ||||||||||
| import cats.effect.{IO, SyncIO} | ||||||||||
| import cats.syntax.all._ | ||||||||||
|
|
@@ -34,6 +34,7 @@ import org.scalacheck.Prop.forAll | |||||||||
|
|
||||||||||
| import scala.concurrent.duration._ | ||||||||||
| import scala.concurrent.TimeoutException | ||||||||||
| import scala.util.control.NoStackTrace | ||||||||||
|
|
||||||||||
| class StreamCombinatorsSuite extends Fs2Suite { | ||||||||||
| override def munitIOTimeout = 1.minute | ||||||||||
|
|
@@ -834,6 +835,77 @@ class StreamCombinatorsSuite extends Fs2Suite { | |||||||||
| ) | ||||||||||
| .assertEquals(0.millis) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| test("upstream failures are propagated downstream") { | ||||||||||
| TestControl.executeEmbed { | ||||||||||
| case object SevenNotAllowed extends NoStackTrace | ||||||||||
|
|
||||||||||
| val source = Stream | ||||||||||
| .iterate(0)(_ + 1) | ||||||||||
| .covary[IO] | ||||||||||
| .evalTap(n => IO.raiseError(SevenNotAllowed).whenA(n == 7)) | ||||||||||
|
|
||||||||||
| val downstream = source.groupWithin(100, 2.seconds) | ||||||||||
|
|
||||||||||
| downstream.intercept[SevenNotAllowed.type] | ||||||||||
|
||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| test( | ||||||||||
| "upstream interruption causes immediate downstream termination with all elements being emitted" | ||||||||||
| ) { | ||||||||||
|
|
||||||||||
| val sourceTimeout = 5.5.seconds | ||||||||||
| val downstreamTimeout = sourceTimeout + 2.seconds | ||||||||||
|
|
||||||||||
| TestControl | ||||||||||
| .executeEmbed( | ||||||||||
| Ref[IO] | ||||||||||
| .of(0.millis) | ||||||||||
| .flatMap { ref => | ||||||||||
| val source: Stream[IO, Int] = | ||||||||||
| Stream | ||||||||||
| .iterate(0)(_ + 1) | ||||||||||
| .covary[IO] | ||||||||||
| .meteredStartImmediately(1.second) | ||||||||||
| .interruptAfter(sourceTimeout) | ||||||||||
|
|
||||||||||
| // large chunkSize and timeout (no emissions expected in the window | ||||||||||
| // specified, unless source ends, due to interruption or | ||||||||||
| // natural termination (i.e runs out of elements) | ||||||||||
| val downstream: Stream[IO, Chunk[Int]] = | ||||||||||
| source.groupWithin(Int.MaxValue, 1.day) | ||||||||||
|
|
||||||||||
| downstream.compile.lastOrError | ||||||||||
| .map(_.toList) | ||||||||||
| .timeout(downstreamTimeout) | ||||||||||
| .flatTap(_ => IO.monotonic.flatMap(ref.set)) | ||||||||||
| .flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit))) | ||||||||||
|
||||||||||
| .timeout(downstreamTimeout) | |
| .flatTap(_ => IO.monotonic.flatMap(ref.set)) | |
| .flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit))) | |
| .timed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, dumb question: why is
Int.MaxValuea "magic number" in this context? I would have thought it's effectively maxing out the semaphore, but if it needs+ outputLongto work then I feel like it must have more significance?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Legit question to be fair. Had to think about it again.
Interruption of the upstream fiber (i.e. Outcome.Cancelled) is handled downstream by doing nothing (permits are never released)
So by increasing the supply to
Int.MaxValuewe are just evening out the negative balance (Int.MaxValueis to account for the worst case scenario: at most thechunkSizeparameter will be equal toInt.MaxValue)Now after getting past the "checkpoint" above we are acquiring
outputLongpermits againSo in order to get past this point we need to release an additional
outputLongpermits and that allows the stream to be unblockedEDIT
uhm well actually I've just tested it, it is not handled with
Outcome.Cancelled...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for that explanation!
So could we just use
chunkSizehere, instead ofInt.MaxValue?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@armanbilge apologies I was wrong, that's not what's happening here. I'm just doing some tests to figure out why we need the additional
outputLongThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, if these implementations details are no longer relevant after your rewrite in the other PR, then let's not get too hung up on this one :)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I think I've figured it out (might be useful for the other implementation actually)
basically the problem is that we need enough supply to cover 2 iterations of the race loop. So if we only increase it by
Int.MaxValuethe following will happenif instead we increase it by
Int.MaxValue + outputLongoutputLongSo since the chunkSize can be as high as
Int.MaxValuethen the minimum supply to unblock the semaphore should beInt.MaxValue + outputLongThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key word being "can". Wouldn't
chunkSize + outputLongbe sufficient?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that should work. The test still passes, I'll change it to
outputLong * 2since chunkSize == outputLong