Skip to content

Commit 417d359

Browse files
committed
Updated scaladoc and fixed a laziness bug
1 parent 4aa9ae7 commit 417d359

File tree

1 file changed

+24
-7
lines changed

1 file changed

+24
-7
lines changed

kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,17 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] {
130130
parTraverseN_(n)(tma)(identity)
131131

132132
/**
133-
* Like `Parallel.parTraverse`, but limits the degree of parallelism. Note that the semantics
134-
* of this operation aim to maximise fairness: when a spot to execute becomes available, every
135-
* task has a chance to claim it, and not only the next `n` tasks in `ta`
133+
* Like `Parallel.parTraverse`, but limits the degree of parallelism. The semantics of this
134+
* function are ordered based on the `Traverse`. The first ''n'' actions will be started
135+
* first, with subsequent actions starting in order as each one completes. Actions which are
136+
* reached earlier in `traverse` order will be started slightly sooner than later actions, in
137+
* a non-blocking fashion. Any errors or self-cancelation will immediately abort the sequence.
138+
* If multiple actios produce errors simultaneously, one of them will be nondeterministically
139+
* selected for production. If all actions succeed, their results are returned in the same
140+
* order as their corresponding inputs, regardless of the order in which they executed.
141+
*
142+
* The `f` function is run as part of running the action: in parallel and subject to the
143+
* limit.
136144
*/
137145
def parTraverseN[T[_]: Traverse, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[T[B]] = {
138146
require(n >= 1, s"Concurrency limit should be at least 1, was: $n")
@@ -164,6 +172,7 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] {
164172
case None =>
165173
F.uncancelable { poll =>
166174
F.deferred[Outcome[F, E, B]] flatMap { result =>
175+
// laziness is significant here, since it pushes the `f` into the fiber
167176
val action = poll(sem.acquire) >> f(a)
168177
.guaranteeCase { oc =>
169178
val completion = oc match {
@@ -221,9 +230,16 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] {
221230
}
222231

223232
/**
224-
* Like `Parallel.parTraverse_`, but limits the degree of parallelism. Note that the semantics
225-
* of this operation aim to maximise fairness: when a spot to execute becomes available, every
226-
* task has a chance to claim it, and not only the next `n` tasks in `ta`
233+
* Like `Parallel.parTraverse_`, but limits the degree of parallelism. The semantics of this
234+
* function are ordered based on the `Foldable`. The first ''n'' actions will be started
235+
* first, with subsequent actions starting in order as each one completes. Actions which are
236+
* reached earlier in `foldLeftM` order will be started slightly sooner than later actions, in
237+
* a non-blocking fashion. Any errors or self-cancelation will immediately abort the sequence.
238+
* If multiple actios produce errors simultaneously, one of them will be nondeterministically
239+
* selected for production.
240+
*
241+
* The `f` function is run as part of running the action: in parallel and subject to the
242+
* limit.
227243
*/
228244
def parTraverseN_[T[_]: Foldable, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[Unit] = {
229245
require(n >= 1, s"Concurrency limit should be at least 1, was: $n")
@@ -262,7 +278,8 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] {
262278

263279
val suppressed = wrapped.void.voidError.guarantee(sem.release)
264280

265-
poll(sem.acquire) *> suppressed.start flatMap { fiber =>
281+
// the laziness is significant here since it pushes the f into the fiber
282+
poll(sem.acquire) >> suppressed.start flatMap { fiber =>
266283
// supervision is handled very differently here: we never remove from the set
267284
supervision.update(fiber :: _)
268285
}

0 commit comments

Comments
 (0)