Skip to content

Commit d76a5a6

Browse files
committed
refactor Task.ParallelWithTrottle
1 parent 7525861 commit d76a5a6

File tree

1 file changed

+41
-10
lines changed
  • src/FSharpx.Extras/ComputationExpressions

1 file changed

+41
-10
lines changed

src/FSharpx.Extras/ComputationExpressions/Monad.fs

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,14 @@ module Task =
11221122
/// Creates a task that runs the given task and ignores its result.
11231123
let inline Ignore t = bind (fun _ -> returnM ()) t
11241124

1125+
/// Active pattern that matches on flattened inner exceptions in an AggregateException
1126+
let (|AggregateExn|_|) (e:exn) =
1127+
match e with
1128+
| :? AggregateException as ae -> ae.Flatten().InnerExceptions
1129+
|> List.ofSeq
1130+
|> Some
1131+
| _ -> None
1132+
11251133
/// Creates a task that executes a specified task.
11261134
/// If this task completes successfully, then this function returns Choice1Of2 with the returned value.
11271135
/// If this task raises an exception before it completes then return Choice2Of2 with the raised exception.
@@ -1130,29 +1138,52 @@ module Task =
11301138
try let! r = t
11311139
return Choice1Of2 r
11321140
with e ->
1133-
return Choice2Of2 e
1141+
let e' = match e with
1142+
| AggregateExn [inner] -> inner
1143+
| x -> x
1144+
return Choice2Of2 e'
11341145
}
11351146

11361147
/// Creates a task that executes all the given tasks.
1137-
let Parallel (tasks : seq<unit -> Task<'a>>) =
1148+
let Parallel (tasks : seq<unit -> Task<'a>>) : (Task<'a[]>) =
11381149
tasks
11391150
|> Seq.map (fun t -> t())
11401151
|> Array.ofSeq
11411152
|> Task.WhenAll
11421153

11431154
/// Creates a task that executes all the given tasks.
1144-
/// The paralelism is throttled, so that at most `throttle` tasks run at one time.
1145-
let ParallelWithTrottle throttle (tasks : seq<unit -> Task<'a>>) : (Task<'a[]>) =
1146-
let semaphore = new SemaphoreSlim(throttle)
1147-
let throttleTask (t:unit->Task<'a>) () : Task<'a> =
1155+
/// This function doesn't throw exceptions, but instead returns an array of Choices.
1156+
let ParallelCatch (tasks : seq<unit -> Task<'a>>) : (Task<Choice<'a, exn>[]>) =
1157+
let catch t () =
1158+
Catch <| t()
1159+
tasks
1160+
|> Seq.map catch
1161+
|> Parallel
1162+
1163+
/// common code for ParallelCatchWithTrottle and ParallelWithTrottle
1164+
let private ParallelWithTrottleCustom transformResult throttle (tasks : seq<unit -> Task<'a>>) : (Task<'b[]>) =
1165+
use semaphore = new SemaphoreSlim(throttle)
1166+
let throttleTask (t:unit->Task<'a>) () : Task<'b> =
11481167
task {
11491168
do! semaphore.WaitAsync() |> ToTaskUnit
11501169
let! result = Catch <| t()
11511170
semaphore.Release() |> ignore
1152-
return match result with
1153-
| Choice1Of2 r -> r
1154-
| Choice2Of2 e -> raise e
1171+
return transformResult result
11551172
}
11561173
tasks
11571174
|> Seq.map throttleTask
1158-
|> Parallel
1175+
|> Parallel
1176+
1177+
/// Creates a task that executes all the given tasks.
1178+
/// This function doesn't throw exceptions, but instead returns an array of Choices.
1179+
/// The paralelism is throttled, so that at most `throttle` tasks run at one time.
1180+
let ParallelCatchWithTrottle throttle (tasks : seq<unit -> Task<'a>>) : (Task<Choice<'a, exn>[]>) =
1181+
ParallelWithTrottleCustom id throttle tasks
1182+
1183+
/// Creates a task that executes all the given tasks.
1184+
/// The paralelism is throttled, so that at most `throttle` tasks run at one time.
1185+
let ParallelWithTrottle throttle (tasks : seq<unit -> Task<'a>>) : (Task<'a[]>) =
1186+
let extractOrThrow = function
1187+
| Choice1Of2 r -> r
1188+
| Choice2Of2 e -> raise e
1189+
ParallelWithTrottleCustom extractOrThrow throttle tasks

0 commit comments

Comments
 (0)