Skip to content

Commit 2091c9e

Browse files
committed
Proper Async.zip implementation
1 parent a1e2f69 commit 2091c9e

File tree

4 files changed

+174
-45
lines changed

4 files changed

+174
-45
lines changed

src/FSharpPlus/Extensions/Async.fs

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,23 @@ namespace FSharpPlus
44
[<RequireQualifiedAccess>]
55
module Async =
66

7+
open System
8+
open System.Threading.Tasks
9+
10+
#if !FABLE_COMPILER
11+
// Proper Async.StartImmediateAsTask implementation, without aggregate exception wrapping.
12+
let private startImmediateAsTask (computation: Async<'T>, cancellationToken) : Task<'T> =
13+
let ts = TaskCompletionSource<'T> ()
14+
Async.StartWithContinuations (
15+
computation,
16+
ts.SetResult,
17+
(function
18+
| :? AggregateException as agg -> ts.SetException agg.InnerExceptions
19+
| exn -> ts.SetException exn),
20+
(fun _ -> ts.SetCanceled ()),
21+
cancellationToken)
22+
ts.Task
23+
#endif
724
open FSharpPlus.Extensions
825

926
/// <summary>Creates an async workflow from another workflow 'x', mapping its result with 'f'.</summary>
@@ -31,62 +48,61 @@ module Async =
3148
let! c = z
3249
return f a b c}
3350

34-
/// <summary>Creates an async workflow from two workflows 'x' and 'y', mapping its results with 'f'.</summary>
35-
/// <remarks>Similar to lift2 but although workflows are started in sequence they might end independently in different order.</remarks>
36-
/// <param name="f">The mapping function.</param>
37-
/// <param name="x">First async workflow.</param>
38-
/// <param name="y">Second async workflow.</param>
51+
/// <summary>Creates an async workflow from two workflows, mapping its results with a specified function.</summary>
52+
/// <remarks>Similar to lift2 but although workflows are started in sequence they might end independently in different order
53+
/// and all errors are collected.
54+
/// </remarks>
55+
/// <param name="mapper">The mapping function.</param>
56+
/// <param name="async1">First async workflow.</param>
57+
/// <param name="async2">Second async workflow.</param>
3958
#if FABLE_COMPILER
40-
let map2 f x y = lift2 f x y
59+
let map2 mapper (async1: Async<'T1>) (async2: Async<'T2>) : Async<'U> =
60+
lift2 mapper async1 async2
4161
#else
42-
let map2 f x y = async {
62+
let map2 mapper (async1: Async<'T1>) (async2: Async<'T2>) : Async<'U> = async {
4363
let! ct = Async.CancellationToken
44-
let x = Async.StartImmediateAsTask (x, ct)
45-
let y = Async.StartImmediateAsTask (y, ct)
46-
let! x' = Async.Await x
47-
let! y' = Async.Await y
48-
return f x' y' }
64+
let t1 = startImmediateAsTask (async1, ct)
65+
let t2 = startImmediateAsTask (async2, ct)
66+
return! Async.Await (Task.map2 mapper t1 t2) }
4967
#endif
5068

51-
/// <summary>Creates an async workflow from three workflows 'x', 'y' and 'z', mapping its results with 'f'.</summary>
52-
/// <remarks>Similar to lift3 but although workflows are started in sequence they might end independently in different order.</remarks>
53-
/// <param name="f">The mapping function.</param>
54-
/// <param name="x">First async workflow.</param>
55-
/// <param name="y">Second async workflow.</param>
56-
/// <param name="z">third async workflow.</param>
69+
/// <summary>Creates an async workflow from three workflows, mapping its results with a specified function.</summary>
70+
/// <remarks>Similar to lift3 but although workflows are started in sequence they might end independently in different order
71+
/// and all errors are collected.
72+
/// </remarks>
73+
/// <param name="mapper">The mapping function.</param>
74+
/// <param name="async1">First async workflow.</param>
75+
/// <param name="async2">Second async workflow.</param>
76+
/// <param name="async3">third async workflow.</param>
5777
#if FABLE_COMPILER
58-
let map3 f x y z = lift3 f x y z
78+
let map3 mapper (async1: Async<'T1>) (async2: Async<'T2>) (async3: Async<'T3>) : Async<'U> =
79+
lift3 mapper async1 async2 async3
5980
#else
60-
let map3 f x y z = async {
81+
let map3 mapper (async1: Async<'T1>) (async2: Async<'T2>) (async3: Async<'T3>) : Async<'U> = async {
6182
let! ct = Async.CancellationToken
62-
let x = Async.StartImmediateAsTask (x, ct)
63-
let y = Async.StartImmediateAsTask (y, ct)
64-
let z = Async.StartImmediateAsTask (z, ct)
65-
let! x' = Async.Await x
66-
let! y' = Async.Await y
67-
let! z' = Async.Await z
68-
return f x' y' z' }
83+
let t1 = startImmediateAsTask (async1, ct)
84+
let t2 = startImmediateAsTask (async2, ct)
85+
let t3 = startImmediateAsTask (async3, ct)
86+
return! Async.Await (Task.map3 mapper t1 t2 t3) }
6987
#endif
7088

7189
/// <summary>Creates an async workflow from two workflows 'x' and 'y', tupling its results.</summary>
7290
let zipSequentially x y = async {
7391
let! a = x
7492
let! b = y
75-
return a, b}
93+
return a, b }
7694

77-
/// <summary>Creates an async workflow from two workflows 'x' and 'y', tupling its results.</summary>
78-
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order.</remarks>
79-
#if FABLE_COMPILER
80-
let zip x y = zipSequentially x y
81-
#else
82-
let zip x y = async {
83-
let! ct = Async.CancellationToken
84-
let x = Async.StartImmediateAsTask (x, ct)
85-
let y = Async.StartImmediateAsTask (y, ct)
86-
let! x' = Async.Await x
87-
let! y' = Async.Await y
88-
return x', y' }
89-
#endif
95+
/// <summary>Creates an async workflow from two workflows, tupling its results.</summary>
96+
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order
97+
/// and all errors are collected.
98+
/// </remarks>
99+
let zip (async1: Async<'T1>) (async2: Async<'T2>) = map2 (fun x y -> x, y) async1 async2
100+
101+
/// <summary>Creates an async workflow from three workflows, tupling its results.</summary>
102+
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order
103+
/// and all errors are collected.
104+
/// </remarks>
105+
let zip3 (async1: Async<'T1>) (async2: Async<'T2>) (async3: Async<'T3>) = map3 (fun x y z -> x, y, z) async1 async2 async3
90106

91107
/// Flatten two nested asyncs into one.
92108
let join x = async.Bind (x, id)

src/FSharpPlus/Extensions/Extensions.fs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,13 @@ module Extensions =
8686
/// at the point where the overall async is started.
8787
/// </remarks>
8888
static member Await (task: Task<'T>) : Async<'T> =
89-
Async.FromContinuations (fun (sc, ec, _) ->
89+
Async.FromContinuations (fun (sc, ec, cc) ->
9090
task.ContinueWith (fun (task: Task<'T>) ->
9191
if task.IsFaulted then
9292
let e = task.Exception
9393
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions[0]
9494
else ec e
95-
elif task.IsCanceled then ec (TaskCanceledException ())
95+
elif task.IsCanceled then cc (TaskCanceledException ())
9696
else sc task.Result)
9797
|> ignore)
9898

@@ -113,13 +113,13 @@ module Extensions =
113113
/// at the point where the overall async is started.
114114
/// </remarks>
115115
static member Await (task: Task) : Async<unit> =
116-
Async.FromContinuations (fun (sc, ec, _) ->
116+
Async.FromContinuations (fun (sc, ec, cc) ->
117117
task.ContinueWith (fun (task: Task) ->
118118
if task.IsFaulted then
119119
let e = task.Exception
120120
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions[0]
121121
else ec e
122-
elif task.IsCanceled then ec (TaskCanceledException ())
122+
elif task.IsCanceled then cc (TaskCanceledException ())
123123
else sc ())
124124
|> ignore)
125125

tests/FSharpPlus.Tests/Asyncs.fs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
namespace FSharpPlus.Tests
2+
3+
#if !NET462 && !NETSTANDARD2_0
4+
5+
module Async =
6+
7+
open System
8+
open System.Threading
9+
open System.Threading.Tasks
10+
open NUnit.Framework
11+
open FSharpPlus
12+
open FSharpPlus.Data
13+
open FSharpPlus.Extensions
14+
open FSharpPlus.Tests.Helpers
15+
16+
exception TestException of string
17+
18+
type Async with
19+
static member StartImmediateAsTaskCorrect (computation: Async<'T>, ?cancellationToken) : Task<'T> =
20+
let cancellationToken = defaultArg cancellationToken (new CancellationToken ())
21+
let ts = TaskCompletionSource<'T> ()
22+
Async.StartWithContinuations (
23+
computation,
24+
ts.SetResult,
25+
(function
26+
| :? AggregateException as agg -> ts.SetException agg.InnerExceptions
27+
| exn -> ts.SetException exn),
28+
(fun _ -> ts.SetCanceled ()),
29+
cancellationToken)
30+
ts.Task
31+
32+
static member AsTaskAndWait computation =
33+
let t = Async.StartImmediateAsTaskCorrect computation
34+
Task.WaitAny t |> ignore
35+
t
36+
37+
static member WhenAll (source: Async<'T> seq) = source |> Seq.map (fun x -> Async.StartImmediateAsTaskCorrect x) |> Task.WhenAll |> Async.Await
38+
39+
40+
module AsyncTests =
41+
42+
let createAsync isFailed delay (value: 'T) =
43+
if not isFailed && delay = 0 then async.Return value
44+
else
45+
async {
46+
if delay = 0 then do! Async.raise (TestException (sprintf "Ouch, can't create: %A" value ))
47+
do! Async.Sleep delay
48+
if isFailed then do! Async.raise (TestException (sprintf "Ouch, can't create: %A" value ))
49+
return value }
50+
51+
52+
[<Test>]
53+
let testAsyncZip () =
54+
let t1 = createAsync true 0 1
55+
let t2 = createAsync true 0 2
56+
let t3 = createAsync true 0 3
57+
58+
let c = new CancellationToken true
59+
let t4 = Task.FromCanceled<int> c |> Async.Await
60+
61+
let t5 = createAsync false 0 5
62+
let t6 = createAsync false 0 6
63+
64+
let t12 = Async.WhenAll [t1; t2]
65+
let t12t12 = Async.WhenAll [t12; t12]
66+
let t33 = Async.WhenAll [t3; t3]
67+
68+
let t12123 = Async.zip3 t12t12 t33 t4
69+
let ac1 =
70+
try
71+
(t12123 |> Async.AsTaskAndWait).Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
72+
with e ->
73+
failwithf "Failure in testAsyncZip. Async status is %A . Exception is %A" (t12123 |> Async.AsTaskAndWait).Status e
74+
75+
CollectionAssert.AreEquivalent ([1; 2; 1; 2; 3], ac1, "Async.zip(3) should add only non already existing exceptions.")
76+
77+
let t13 = Async.zip3 (Async.zip t1 t3) t4 (Async.zip t5 t6)
78+
Assert.AreEqual (true, (t13 |> Async.AsTaskAndWait).IsFaulted, "Async.zip(3) between a value, an exception and a cancellation -> exception wins.")
79+
let ac2 = (t13 |> Async.AsTaskAndWait).Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
80+
CollectionAssert.AreEquivalent ([1; 3], ac2, "Async.zip between 2 exceptions => both exceptions returned, even after combining with cancellation and values.")
81+
82+
[<Test>]
83+
let testAsyncZipAsync () =
84+
let t1 = createAsync true 20 1
85+
let t2 = createAsync true 10 2
86+
let t3 = createAsync true 30 3
87+
88+
let c = new CancellationToken true
89+
let t4 = Task.FromCanceled<int> c |> Async.Await
90+
91+
let t5 = createAsync false 20 5
92+
let t6 = createAsync false 10 6
93+
94+
let t12 = Async.WhenAll [t1; t2]
95+
let t12t12 = Async.WhenAll [t12; t12]
96+
let t33 = Async.WhenAll [t3; t3]
97+
98+
let t12123 = Async.zip3 t12t12 t33 t4
99+
let ac1 =
100+
try
101+
(t12123 |> Async.AsTaskAndWait).Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
102+
with e ->
103+
failwithf "Failure in testAsyncZipAsync. Async status is %A . Exception is %A" (t12123 |> Async.AsTaskAndWait).Status e
104+
105+
CollectionAssert.AreEquivalent ([1; 2; 1; 2; 3], ac1, "Async.zip(3)Async should add only non already existing exceptions.")
106+
107+
let t13 = Async.zip3 (Async.zip t1 t3) t4 (Async.zip t5 t6)
108+
Assert.AreEqual (true, (t13 |> Async.AsTaskAndWait).IsFaulted, "Async.zip(3)Async between a value, an exception and a cancellation -> exception wins.")
109+
let ac2 = (t13 |> Async.AsTaskAndWait).Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
110+
CollectionAssert.AreEquivalent ([1; 3], ac2, "Async.zipAsync between 2 exceptions => both exceptions returned, even after combining with cancellation and values.")
111+
112+
#endif

tests/FSharpPlus.Tests/FSharpPlus.Tests.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
<Compile Include="Validations.fs" />
2929
<Compile Include="Task.fs" />
3030
<Compile Include="ValueTask.fs" />
31+
<Compile Include="Asyncs.fs" />
3132
<Compile Include="Free.fs" />
3233
<Compile Include="SeqT.fs" />
3334
<Compile Include="ComputationExpressions.fs" />

0 commit comments

Comments
 (0)