Skip to content

Commit d995800

Browse files
committed
Fix ParallelWithThrottle
`semaphore` in `ParallelWithThrottleCustom` was disposed too early
1 parent b39e0ea commit d995800

File tree

2 files changed

+27
-17
lines changed

2 files changed

+27
-17
lines changed

src/FSharpx.Extras/ComputationExpressions/Monad.fs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,27 +1218,29 @@ module Task =
12181218
|> Seq.map catch
12191219
|> Parallel
12201220

1221-
/// common code for ParallelCatchWithTrottle and ParallelWithTrottle
1222-
let private ParallelWithTrottleCustom transformResult throttle (tasks : seq<unit -> Task<'a>>) : (Task<'b[]>) =
1223-
use semaphore = new SemaphoreSlim(throttle)
1224-
let throttleTask (t:unit->Task<'a>) () : Task<'b> =
1225-
task {
1226-
do! semaphore.WaitAsync() |> ToTaskUnit
1227-
let! result = Catch <| t()
1228-
semaphore.Release() |> ignore
1229-
return transformResult result
1230-
}
1231-
tasks
1232-
|> Seq.map throttleTask
1233-
|> Parallel
1221+
/// common code for ParallelCatchWithThrottle and ParallelWithThrottle
1222+
let private ParallelWithThrottleCustom transformResult throttle (tasks : seq<unit -> Task<'a>>) : (Task<'b[]>) =
1223+
task {
1224+
use semaphore = new SemaphoreSlim(throttle)
1225+
let throttleTask (t:unit->Task<'a>) () : Task<'b> =
1226+
task {
1227+
do! semaphore.WaitAsync() |> ToTaskUnit
1228+
let! result = Catch <| t()
1229+
semaphore.Release() |> ignore
1230+
return transformResult result
1231+
}
1232+
return! tasks
1233+
|> Seq.map throttleTask
1234+
|> Parallel
1235+
}
12341236

12351237
/// Creates a task that executes all the given tasks.
12361238
/// This function doesn't throw exceptions, but instead returns an array of Choices.
12371239
/// The paralelism is throttled, so that at most `throttle` tasks run at one time.
1238-
let ParallelCatchWithTrottle throttle (tasks : seq<unit -> Task<'a>>) : (Task<Choice<'a, exn>[]>) =
1239-
ParallelWithTrottleCustom id throttle tasks
1240+
let ParallelCatchWithThrottle throttle (tasks : seq<unit -> Task<'a>>) : (Task<Choice<'a, exn>[]>) =
1241+
ParallelWithThrottleCustom id throttle tasks
12401242

12411243
/// Creates a task that executes all the given tasks.
12421244
/// The paralelism is throttled, so that at most `throttle` tasks run at one time.
1243-
let ParallelWithTrottle throttle (tasks : seq<unit -> Task<'a>>) : (Task<'a[]>) =
1244-
ParallelWithTrottleCustom Choice.getOrRaise throttle tasks
1245+
let ParallelWithThrottle throttle (tasks : seq<unit -> Task<'a>>) : (Task<'a[]>) =
1246+
ParallelWithThrottleCustom Choice.getOrRaise throttle tasks

tests/FSharpx.Tests/TaskTests.fs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,14 @@ let ``try with should do unwrapping of exception to original type if it was rais
186186
}
187187
Assert.AreEqual(10, result.Result)
188188

189+
[<Test>]
190+
let ``Parallel with throttle``() =
191+
let nums = [|123|]
192+
let work n () = Task.task.Return(n)
193+
let tasks = nums |> Array.map work
194+
let result = Task.ParallelWithThrottle 1 tasks
195+
Assert.AreEqual(nums, result.Result)
196+
189197
open FsCheck
190198
open FsCheck.NUnit
191199

0 commit comments

Comments
 (0)