Skip to content

Commit 98f8d7e

Browse files
committed
* Add Async.parallelMap2
* Add Async.parallelMap3 * Add Async.parallelZip * Add parallelAsyncResult * Add parallelAsyncValidation * Add tests
1 parent a74348e commit 98f8d7e

15 files changed

+2156
-565
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ FakesAssemblies/
372372
_Pvt_Extensions
373373

374374
# Paket dependency manager
375+
.paket
375376
paket-files/
376377

377378
# FAKE - F# Make

.paket/Paket.Restore.targets

Lines changed: 0 additions & 565 deletions
This file was deleted.

src/FsToolkit.ErrorHandling/Async.fs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,75 @@ module Async =
114114
let inline zip (left: Async<'left>) (right: Async<'right>) : Async<'left * 'right> =
115115
bind (fun l -> bind (fun r -> singleton (l, r)) right) left
116116

117+
/// <summary>
118+
/// Executes two asyncs concurrently <see cref='M:Microsoft.FSharp.Control.FSharpAsync.Parallel``1'/> and returns a mapping of the values
119+
/// </summary>
120+
/// <param name="mapper">The function to apply to the values of the <c>Async</c> values.</param>
121+
/// <param name="input1">The first <c>Async</c> to execute</param>
122+
/// <param name="input2">The second <c>Async</c> to execute</param>
123+
/// <returns>The transformed <c>Async</c> value.</returns>
124+
let inline parallelMap2
125+
([<InlineIfLambda>] mapper: 'input1 -> 'input2 -> 'output)
126+
(input1: Async<'input1>)
127+
(input2: Async<'input2>)
128+
: Async<'output> =
129+
Async.Parallel(
130+
[|
131+
map box input1
132+
map box input2
133+
|],
134+
maxDegreeOfParallelism = 2
135+
)
136+
|> map (fun results ->
137+
let a =
138+
results[0]
139+
|> unbox<'input1>
140+
141+
let b =
142+
results[1]
143+
|> unbox<'input2>
144+
145+
mapper a b
146+
)
147+
148+
/// <summary>
149+
/// Executes three asyncs concurrently <see cref='M:Microsoft.FSharp.Control.FSharpAsync.Parallel``1'/> and returns a mapping of the values
150+
/// </summary>
151+
/// <param name="mapper">The function to apply to the values of the <c>Async</c> values.</param>
152+
/// <param name="input1">The first <c>Async</c> to execute</param>
153+
/// <param name="input2">The second <c>Async</c> to execute</param>
154+
/// <param name="input3">The third <c>Async</c> value to transform.</param>
155+
/// <returns>The transformed <c>Async</c> value.</returns>
156+
let inline parallelMap3
157+
([<InlineIfLambda>] mapper: 'input1 -> 'input2 -> 'input3 -> 'output)
158+
(input1: Async<'input1>)
159+
(input2: Async<'input2>)
160+
(input3: Async<'input3>)
161+
: Async<'output> =
162+
Async.Parallel(
163+
[|
164+
map box input1
165+
map box input2
166+
map box input3
167+
|],
168+
maxDegreeOfParallelism = 3
169+
)
170+
|> map (fun results ->
171+
let a =
172+
results[0]
173+
|> unbox<'input1>
174+
175+
let b =
176+
results[1]
177+
|> unbox<'input2>
178+
179+
let c =
180+
results[2]
181+
|> unbox<'input3>
182+
183+
mapper a b c
184+
)
185+
117186
/// <summary>
118187
/// Operators for working with the <c>Async</c> type.
119188
/// </summary>

src/FsToolkit.ErrorHandling/FsToolkit.ErrorHandling.fsproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@
4848
<Compile Include="TaskResultOption.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
4949
<Compile Include="TaskResultOptionCE.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
5050
<Compile Include="TaskResultOptionOp.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
51+
<Compile Include="ParallelAsyncResult.fs" />
52+
<Compile Include="ParallelAsyncResultCE.fs" />
53+
<Compile Include="ParallelAsyncValidation.fs" />
54+
<Compile Include="ParallelAsyncValidationCE.fs" />
5155
<Compile Include="List.fs" />
5256
<Compile Include="Array.fs" />
5357
<Compile Include="Seq.fs" />
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
namespace FsToolkit.ErrorHandling
2+
3+
open System
4+
5+
[<RequireQualifiedAccess>]
6+
module ParallelAsyncResult =
7+
8+
[<AutoOpen>]
9+
module InternalHelpers =
10+
11+
type AsyncResultErrorException<'a>(value: 'a) =
12+
inherit Exception()
13+
member this.Value = value
14+
15+
let toBoxedAsync (input: Async<Result<'ok, 'error>>) : Async<obj> =
16+
async {
17+
match! input with
18+
| Ok x -> return box x
19+
| Error e -> return raise (AsyncResultErrorException<'error>(e))
20+
}
21+
22+
/// <summary>
23+
/// Transforms two AsyncResults in one that executes them concurrently and combines the results using the specified function.
24+
/// If either AsyncResult resolves to an error, then the other is cancelled and only the first error is returned.
25+
/// </summary>
26+
/// <param name="mapper">The function to apply to the values of the AsyncResult values.</param>
27+
/// <param name="input1">The first AsyncResult value to transform.</param>
28+
/// <param name="input2">The second AsyncResult value to transform.</param>
29+
/// <returns>The transformed AsyncResult value.</returns>
30+
let inline map2
31+
([<InlineIfLambda>] mapper: 'a -> 'b -> 'c)
32+
(input1: Async<Result<'a, 'error>>)
33+
(input2: Async<Result<'b, 'error>>)
34+
: Async<Result<'c, 'error>> =
35+
async {
36+
try
37+
return!
38+
Async.parallelMap2
39+
(fun a b ->
40+
let a = unbox<'a> a
41+
let b = unbox<'b> b
42+
Ok(mapper a b)
43+
)
44+
(toBoxedAsync input1)
45+
(toBoxedAsync input2)
46+
47+
with :? AsyncResultErrorException<'error> as exn ->
48+
return Error exn.Value
49+
}
50+
51+
/// <summary>
52+
/// Transforms three AsyncResults in one that executes them concurrently and combines the results using the specified function.
53+
/// If any AsyncResult resolves to an error, then the others are cancelled and only the first error is returned.
54+
/// </summary>
55+
/// <param name="mapper">The function to apply to the values of the AsyncResult values.</param>
56+
/// <param name="input1">The first AsyncResult value to transform.</param>
57+
/// <param name="input2">The second AsyncResult value to transform.</param>
58+
/// <param name="input3">The third AsyncResult value to transform.</param>
59+
/// <returns>The transformed AsyncResult value.</returns>
60+
let inline map3
61+
([<InlineIfLambda>] mapper: 'a -> 'b -> 'c -> 'd)
62+
(input1: Async<Result<'a, 'error>>)
63+
(input2: Async<Result<'b, 'error>>)
64+
(input3: Async<Result<'c, 'error>>)
65+
: Async<Result<'d, 'error>> =
66+
async {
67+
try
68+
return!
69+
Async.parallelMap3
70+
(fun a b c ->
71+
let a = unbox<'a> a
72+
let b = unbox<'b> b
73+
let c = unbox<'c> c
74+
Ok(mapper a b c)
75+
)
76+
(toBoxedAsync input1)
77+
(toBoxedAsync input2)
78+
(toBoxedAsync input3)
79+
80+
with :? AsyncResultErrorException<'error> as exn ->
81+
return Error exn.Value
82+
}
83+
84+
let inline zip
85+
(a: Async<Result<'a, 'error>>)
86+
(b: Async<Result<'b, 'error>>)
87+
: Async<Result<'a * 'b, 'error>> =
88+
map2 (fun a b -> a, b) a b
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
namespace FsToolkit.ErrorHandling
2+
3+
open System
4+
open System.Threading.Tasks
5+
6+
[<AutoOpen>]
7+
module ParallelAsyncResultCE =
8+
9+
type ParallelAsyncResultBuilder() =
10+
11+
member inline _.Return(value: 'ok) : Async<Result<'ok, 'error>> =
12+
result.Return value
13+
|> async.Return
14+
15+
member inline _.ReturnFrom(input: Async<Result<'ok, 'error>>) : Async<Result<'ok, 'error>> =
16+
input
17+
18+
member inline _.Zero() : Async<Result<unit, 'error>> =
19+
result.Zero()
20+
|> async.Return
21+
22+
member inline _.Bind
23+
(
24+
asyncResult: Async<Result<'okInput, 'error>>,
25+
[<InlineIfLambda>] binder: 'okInput -> Async<Result<'okOutput, 'error>>
26+
) : Async<Result<'okOutput, 'error>> =
27+
AsyncResult.bind binder asyncResult
28+
29+
member inline _.Delay
30+
([<InlineIfLambda>] generator: unit -> Async<Result<'ok, 'error>>)
31+
: Async<Result<'ok, 'error>> =
32+
async.Delay generator
33+
34+
member inline this.Combine
35+
(computation1: Async<Result<unit, 'error>>, computation2: Async<Result<'ok, 'error>>)
36+
: Async<Result<'ok, 'error>> =
37+
this.Bind(computation1, (fun () -> computation2))
38+
39+
member inline _.TryWith
40+
(
41+
computation: Async<Result<'ok, 'error>>,
42+
[<InlineIfLambda>] handler: System.Exception -> Async<Result<'ok, 'error>>
43+
) : Async<Result<'ok, 'error>> =
44+
async.TryWith(computation, handler)
45+
46+
member inline _.TryFinally
47+
(computation: Async<Result<'ok, 'error>>, [<InlineIfLambda>] compensation: unit -> unit)
48+
: Async<Result<'ok, 'error>> =
49+
async.TryFinally(computation, compensation)
50+
#if !FABLE_COMPILER
51+
member inline _.TryFinallyAsync
52+
(
53+
computation: Async<Result<'ok, 'error>>,
54+
[<InlineIfLambda>] compensation: unit -> ValueTask
55+
) : Async<Result<'ok, 'error>> =
56+
let compensation =
57+
async {
58+
let vTask = compensation ()
59+
60+
if vTask.IsCompletedSuccessfully then
61+
return ()
62+
else
63+
return!
64+
vTask.AsTask()
65+
|> Async.AwaitTask
66+
}
67+
68+
Async.TryFinallyAsync(computation, compensation)
69+
70+
71+
member inline this.Using
72+
(
73+
resource: 'ok :> IAsyncDisposable,
74+
[<InlineIfLambda>] binder: 'ok -> Async<Result<'U, 'error>>
75+
) : Async<Result<'U, 'error>> =
76+
this.TryFinallyAsync(
77+
binder resource,
78+
(fun () ->
79+
if not (isNull (box resource)) then
80+
resource.DisposeAsync()
81+
else
82+
ValueTask()
83+
)
84+
)
85+
#endif
86+
member inline this.While
87+
([<InlineIfLambda>] guard: unit -> bool, computation: Async<Result<unit, 'error>>)
88+
: Async<Result<unit, 'error>> =
89+
if guard () then
90+
let mutable whileAsync = Unchecked.defaultof<_>
91+
92+
whileAsync <-
93+
this.Bind(computation, (fun () -> if guard () then whileAsync else this.Zero()))
94+
95+
whileAsync
96+
else
97+
this.Zero()
98+
99+
100+
member inline _.BindReturn
101+
(x: Async<Result<'okInput, 'error>>, [<InlineIfLambda>] f: 'okInput -> 'okOutput)
102+
: Async<Result<'okOutput, 'error>> =
103+
AsyncResult.map f x
104+
105+
/// <summary>
106+
/// Method lets us transform data types into our internal representation. This is the identity method to recognize the self type.
107+
///
108+
/// See https://stackoverflow.com/questions/35286541/why-would-you-use-builder-source-in-a-custom-computation-expression-builder
109+
/// </summary>
110+
member inline _.Source(result: Async<Result<'ok, 'error>>) : Async<Result<'ok, 'error>> =
111+
result
112+
113+
member inline _.MergeSources(input1, input2) : Async<Result<'a * 'b, 'error>> =
114+
ParallelAsyncResult.zip input1 input2
115+
116+
let parallelAsyncResult = ParallelAsyncResultBuilder()
117+
118+
[<AutoOpen>]
119+
module MediumPriority =
120+
121+
type ParallelAsyncResultBuilder with
122+
123+
/// <summary>
124+
/// Needed to allow `for..in` and `for..do` functionality
125+
/// </summary>
126+
member inline _.Source(s: #seq<'value>) : #seq<'value> = s
127+
128+
/// <summary>
129+
/// Method lets us transform data types into our internal representation.
130+
/// </summary>
131+
member inline _.Source(result: Result<'ok, 'error>) : Async<Result<'ok, 'error>> =
132+
Async.singleton result
133+
134+
/// <summary>
135+
/// Method lets us transform data types into our internal representation.
136+
/// </summary>
137+
member inline _.Source(choice: Choice<'ok, 'error>) : Async<Result<'ok, 'error>> =
138+
choice
139+
|> Result.ofChoice
140+
|> Async.singleton
141+
142+
/// <summary>
143+
/// Method lets us transform data types into our internal representation.
144+
/// </summary>
145+
member inline _.Source(asyncComputation: Async<'ok>) : Async<Result<'ok, 'error>> =
146+
asyncComputation
147+
|> Async.map Ok
148+
149+
150+
member inline _.Using
151+
(
152+
resource: 'ok :> IDisposable,
153+
[<InlineIfLambda>] binder: 'ok -> Async<Result<'U, 'error>>
154+
) : Async<Result<'U, 'error>> =
155+
async.Using(resource, binder)
156+
157+
158+
member inline this.For
159+
(sequence: #seq<'ok>, [<InlineIfLambda>] binder: 'ok -> Async<Result<unit, 'error>>)
160+
: Async<Result<unit, 'error>> =
161+
this.Using(
162+
sequence.GetEnumerator(),
163+
fun enum ->
164+
this.While(
165+
(fun () -> enum.MoveNext()),
166+
this.Delay(fun () -> binder enum.Current)
167+
)
168+
)
169+
170+
#if !FABLE_COMPILER
171+
/// <summary>
172+
/// Method lets us transform data types into our internal representation.
173+
/// </summary>
174+
member inline _.Source(task: Task<'ok>) : Async<Result<'ok, 'error>> =
175+
task
176+
|> Async.AwaitTask
177+
|> Async.map Ok
178+
179+
/// <summary>
180+
/// Method lets us transform data types into our internal representation.
181+
/// </summary>
182+
member inline _.Source(task: Task) : Async<Result<unit, 'error>> =
183+
task
184+
|> Async.AwaitTask
185+
|> Async.map Ok
186+
#endif
187+
188+
#if !FABLE_COMPILER
189+
[<AutoOpen>]
190+
module HighPriority =
191+
192+
type ParallelAsyncResultBuilder with
193+
194+
/// <summary>
195+
/// Method lets us transform data types into our internal representation.
196+
/// </summary>
197+
member inline _.Source(task: Task<Result<'ok, 'error>>) : Async<Result<'ok, 'error>> =
198+
task
199+
|> Async.AwaitTask
200+
#endif

0 commit comments

Comments
 (0)