Skip to content

Commit a1e2f69

Browse files
committed
Proper ValueTask.zip implementation
1 parent c564cbf commit a1e2f69

File tree

2 files changed

+217
-68
lines changed

2 files changed

+217
-68
lines changed

src/FSharpPlus/Extensions/ValueTask.fs

Lines changed: 124 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,36 @@ namespace FSharpPlus
66
[<RequireQualifiedAccess>]
77
module ValueTask =
88

9+
open System
10+
open System.Threading
911
open System.Threading.Tasks
1012

1113
let inline internal (|Succeeded|Canceled|Faulted|) (t: ValueTask<'T>) =
1214
if t.IsCompletedSuccessfully then Succeeded t.Result
1315
elif t.IsCanceled then Canceled
14-
else Faulted (t.AsTask().Exception.InnerExceptions)
16+
else Faulted (t.AsTask().Exception)
1517

1618
let inline internal continueTask (tcs: TaskCompletionSource<'Result>) (x: ValueTask<'t>) (k: 't -> unit) =
1719
let f = function
1820
| Succeeded r -> k r
1921
| Canceled -> tcs.SetCanceled ()
20-
| Faulted e -> tcs.SetException e
22+
| Faulted e -> tcs.SetException e.InnerExceptions
2123
if x.IsCompleted then f x
22-
else
23-
let aw = x.GetAwaiter ()
24-
aw.OnCompleted (fun () -> f x)
24+
else x.ConfigureAwait(false).GetAwaiter().UnsafeOnCompleted (fun () -> f x)
25+
26+
let inline internal continueWith (x: ValueTask<'t>) f =
27+
if x.IsCompleted then f x
28+
else x.ConfigureAwait(false).GetAwaiter().UnsafeOnCompleted (fun () -> f x)
29+
30+
/// Creates a ValueTask from a value
31+
let result (value: 'T) : ValueTask<'T> =
32+
#if NET5_0_OR_GREATER
33+
ValueTask.FromResult value
34+
#else
35+
let tcs = TaskCompletionSource<'T> ()
36+
tcs.SetResult value
37+
tcs.Task |> ValueTask<'T>
38+
#endif
2539

2640
/// <summary>Creates a ValueTask workflow from 'source' another, mapping its result with 'f'.</summary>
2741
/// <param name="f">The mapping function.</param>
@@ -62,34 +76,101 @@ module ValueTask =
6276
with e -> tcs.SetException e)))
6377
tcs.Task |> ValueTask<'W>
6478

65-
/// <summary>Creates a task workflow from two workflows 'x' and 'y', mapping its results with 'f'.</summary>
66-
/// <remarks>Similar to lift2 but although workflows are started in sequence they might end independently in different order.</remarks>
67-
/// <param name="f">The mapping function.</param>
68-
/// <param name="x">First ValueTask workflow.</param>
69-
/// <param name="y">Second ValueTask workflow.</param>
70-
/// <param name="z">Third ValueTask workflow.</param>
71-
let map2 (f: 'T -> 'U -> 'V) (x: ValueTask<'T>) (y: ValueTask<'U>) : ValueTask<'V> =
72-
task {
73-
let! x' = x
74-
let! y' = y
75-
return f x' y'
76-
}
77-
|> ValueTask<'V>
79+
/// <summary>Creates a ValueTask workflow from two workflows, mapping its results with a specified function.</summary>
80+
/// <remarks>Similar to lift2 but although workflows are started in sequence they might end independently in different order
81+
/// and all errors are collected.
82+
/// </remarks>
83+
/// <param name="mapper">The mapping function.</param>
84+
/// <param name="task1">First ValueTask workflow.</param>
85+
/// <param name="task2">Second ValueTask workflow.</param>
86+
let map2 mapper (task1: ValueTask<'T1>) (task2: ValueTask<'T2>) : ValueTask<'U> =
87+
if task1.IsCompletedSuccessfully && task2.IsCompletedSuccessfully then
88+
try result (mapper task1.Result task2.Result)
89+
with e ->
90+
let tcs = TaskCompletionSource<_> ()
91+
tcs.SetException e
92+
tcs.Task |> ValueTask<'U>
93+
else
94+
let tcs = TaskCompletionSource<_> ()
95+
let r1 = ref Unchecked.defaultof<_>
96+
let r2 = ref Unchecked.defaultof<_>
97+
let mutable cancelled = false
98+
let failures = [|IReadOnlyCollection.empty; IReadOnlyCollection.empty|]
99+
let pending = ref 2
78100

79-
/// <summary>Creates a ValueTask workflow from three workflows 'x', 'y' and z, mapping its results with 'f'.</summary>
80-
/// <remarks>Similar to lift3 but although workflows are started in sequence they might end independently in different order.</remarks>
81-
/// <param name="f">The mapping function.</param>
82-
/// <param name="x">First ValueTask workflow.</param>
83-
/// <param name="y">Second ValueTask workflow.</param>
84-
/// <param name="z">Third ValueTask workflow.</param>
85-
let map3 (f: 'T -> 'U -> 'V -> 'W) (x: ValueTask<'T>) (y: ValueTask<'U>) (z: ValueTask<'V>) : ValueTask<'W> =
86-
task {
87-
let! x' = x
88-
let! y' = y
89-
let! z' = z
90-
return f x' y' z'
91-
}
92-
|> ValueTask<'W>
101+
let trySet () =
102+
if Interlocked.Decrement pending = 0 then
103+
let noFailures = Array.forall IReadOnlyCollection.isEmpty failures
104+
if noFailures && not cancelled then
105+
try tcs.TrySetResult (mapper r1.Value r2.Value) |> ignore
106+
with e -> tcs.TrySetException e |> ignore
107+
elif noFailures then tcs.TrySetCanceled () |> ignore
108+
else tcs.TrySetException (failures |> Seq.map AggregateException |> Seq.reduce Exception.add).InnerExceptions |> ignore
109+
110+
let k (v: ref<_>) i t =
111+
match t with
112+
| Succeeded r -> v.Value <- r
113+
| Canceled -> cancelled <- true
114+
| Faulted e -> failures[i] <- e.InnerExceptions
115+
trySet ()
116+
117+
if task1.IsCompleted && task2.IsCompleted then
118+
task1 |> k r1 0
119+
task2 |> k r2 1
120+
else
121+
continueWith task1 (k r1 0)
122+
continueWith task2 (k r2 1)
123+
tcs.Task |> ValueTask<'U>
124+
125+
/// <summary>Creates a ValueTask workflow from three workflows, mapping its results with a specified function.</summary>
126+
/// <remarks>Similar to lift3 but although workflows are started in sequence they might end independently in different order
127+
/// and all errors are collected.
128+
/// </remarks>
129+
/// <param name="mapper">The mapping function.</param>
130+
/// <param name="task1">First ValueTask workflow.</param>
131+
/// <param name="task2">Second ValueTask workflow.</param>
132+
/// <param name="task3">Third ValueTask workflow.</param>
133+
let map3 mapper (task1: ValueTask<'T1>) (task2: ValueTask<'T2>) (task3: ValueTask<'T3>) : ValueTask<'U> =
134+
if task1.IsCompletedSuccessfully && task2.IsCompletedSuccessfully && task3.IsCompletedSuccessfully then
135+
try result (mapper task1.Result task2.Result task3.Result)
136+
with e ->
137+
let tcs = TaskCompletionSource<_> ()
138+
tcs.SetException e
139+
tcs.Task |> ValueTask<'U>
140+
else
141+
let tcs = TaskCompletionSource<_> ()
142+
let r1 = ref Unchecked.defaultof<_>
143+
let r2 = ref Unchecked.defaultof<_>
144+
let r3 = ref Unchecked.defaultof<_>
145+
let mutable cancelled = false
146+
let failures = [|IReadOnlyCollection.empty; IReadOnlyCollection.empty; IReadOnlyCollection.empty|]
147+
let pending = ref 3
148+
149+
let trySet () =
150+
if Interlocked.Decrement pending = 0 then
151+
let noFailures = Array.forall IReadOnlyCollection.isEmpty failures
152+
if noFailures && not cancelled then
153+
try tcs.TrySetResult (mapper r1.Value r2.Value r3.Value) |> ignore
154+
with e -> tcs.TrySetException e |> ignore
155+
elif noFailures then tcs.TrySetCanceled () |> ignore
156+
else tcs.TrySetException (failures |> Seq.map AggregateException |> Seq.reduce Exception.add).InnerExceptions |> ignore
157+
158+
let k (v: ref<_>) i t =
159+
match t with
160+
| Succeeded r -> v.Value <- r
161+
| Canceled -> cancelled <- true
162+
| Faulted e -> failures[i] <- e.InnerExceptions
163+
trySet ()
164+
165+
if task1.IsCompleted && task2.IsCompleted && task3.IsCompleted then
166+
task1 |> k r1 0
167+
task2 |> k r2 1
168+
task3 |> k r3 2
169+
else
170+
continueWith task1 (k r1 0)
171+
continueWith task2 (k r2 1)
172+
continueWith task3 (k r3 2)
173+
tcs.Task |> ValueTask<'U>
93174

94175
/// <summary>Creates a ValueTask workflow that is the result of applying the resulting function of a ValueTask workflow
95176
/// to the resulting value of another ValueTask workflow</summary>
@@ -111,15 +192,17 @@ module ValueTask =
111192
tcs.SetResult (x, y)))
112193
tcs.Task |> ValueTask<'T * 'U>
113194

114-
/// <summary>Creates a ValueTask workflow from two workflows 'x' and 'y', tupling its results.</summary>
115-
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order.</remarks>
116-
let zip (x: ValueTask<'T>) (y: ValueTask<'U>) : ValueTask<'T * 'U> =
117-
task {
118-
let! x' = x
119-
let! y' = y
120-
return x', y'
121-
}
122-
|> ValueTask<'T * 'U>
195+
/// <summary>Creates a ValueTask workflow from two workflows, tupling its results.</summary>
196+
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order
197+
/// and all errors are collected.
198+
/// </remarks>
199+
let zip (task1: ValueTask<'T1>) (task2: ValueTask<'T2>) = map2 (fun x y -> x, y) task1 task2
200+
201+
/// <summary>Creates a ValueTask workflow from three workflows, tupling its results.</summary>
202+
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order
203+
/// and all errors are collected.
204+
/// </remarks>
205+
let zip3 (task1: ValueTask<'T1>) (task2: ValueTask<'T2>) (task3: ValueTask<'T3>) = map3 (fun x y z -> x, y, z) task1 task2 task3
123206

124207
/// Flattens two nested ValueTask into one.
125208
let join (source: ValueTask<ValueTask<'T>>) : ValueTask<'T> =
@@ -152,10 +235,5 @@ module ValueTask =
152235

153236
/// Raises an exception in the ValueTask
154237
let raise (``exception``: exn) = ValueTask<'TResult> (Task.FromException<'TResult> ``exception``)
155-
156238

157-
#if NET5_0_OR_GREATER
158-
/// Creates a ValueTask from a value
159-
let result value = ValueTask.FromResult value
160-
#endif
161239
#endif

tests/FSharpPlus.Tests/ValueTask.fs

Lines changed: 93 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
module ValueTask =
66

77
open System
8+
open System.Threading
89
open System.Threading.Tasks
910
open NUnit.Framework
1011
open FSharpPlus
@@ -13,18 +14,15 @@ module ValueTask =
1314

1415
exception TestException of string
1516

17+
type ValueTask<'T> with
18+
static member WhenAll (source: ValueTask<'T> seq) = source |> Seq.map (fun x -> x.AsTask ()) |> Task.WhenAll |> ValueTask<'T []>
19+
static member WaitAny (source: ValueTask<'T>) = source.AsTask () |> Task.WaitAny |> ignore
1620

1721
module ValueTask =
1822
open System.Threading
1923

2024
// Following is not available in F#6
2125

22-
/// <summary>Creates a <see cref="ValueTask{TResult}"/> that's completed successfully with the specified result.</summary>
23-
/// <typeparam name="TResult">The type of the result returned by the task.</typeparam>
24-
/// <param name="result">The result to store into the completed task.</param>
25-
/// <returns>The successfully completed task.</returns>
26-
let FromResult<'TResult> (result: 'TResult) = ValueTask<'TResult> result
27-
2826
/// <summary>Creates a <see cref="ValueTask{TResult}"/> that's completed exceptionally with the specified exception.</summary>
2927
/// <typeparam name="TResult">The type of the result returned by the task.</typeparam>
3028
/// <param name="exception">The exception with which to complete the task.</param>
@@ -39,10 +37,14 @@ module ValueTask =
3937

4038
module ValueTaskTests =
4139

42-
let createValueTask isFailed value =
43-
if not isFailed then ValueTask.FromResult<_> value
40+
let createValueTask isFailed delay (value: 'T) =
41+
if not isFailed && delay = 0 then ValueTask.result value
4442
else
45-
ValueTask.FromException<_> (TestException (sprintf "Ouch, can't create: %A" value ))
43+
let tcs = TaskCompletionSource<_> ()
44+
if delay = 0 then tcs.SetException (TestException (sprintf "Ouch, can't create: %A" value ))
45+
else (Task.Delay delay).ContinueWith (fun _ ->
46+
if isFailed then tcs.SetException (TestException (sprintf "Ouch, can't create: %A" value )) else tcs.SetResult value) |> ignore
47+
tcs.Task |> ValueTask<'T>
4648

4749
let (|AggregateException|_|) (x: exn) =
4850
match x with
@@ -53,9 +55,9 @@ module ValueTask =
5355

5456
[<Test>]
5557
let shortCircuits () =
56-
let x1 = createValueTask false 1
57-
let x2 = createValueTask false 2
58-
let x3 = createValueTask false 3
58+
let x1 = createValueTask false 0 1
59+
let x2 = createValueTask false 0 2
60+
let x3 = createValueTask false 0 3
5961

6062
let a = ValueTask.map string x1
6163
require a.IsCompleted "ValueTask.map didn't short-circuit"
@@ -77,11 +79,11 @@ module ValueTask =
7779

7880
[<Test>]
7981
let erroredValueTasks () =
80-
let e1 () = createValueTask true 1
81-
let e2 () = createValueTask true 2
82-
let e3 () = createValueTask true 3
83-
let x1 () = createValueTask false 1
84-
let x2 () = createValueTask false 2
82+
let e1 () = createValueTask true 0 1
83+
let e2 () = createValueTask true 0 2
84+
let e3 () = createValueTask true 0 3
85+
let x1 () = createValueTask false 0 1
86+
let x2 () = createValueTask false 0 2
8587

8688
let mapping isFailure x = if isFailure then raise (TestException "I was told to fail") else x
8789
let mapping2 isFailure x y = if isFailure then raise (TestException "I was told to fail") else x + y
@@ -94,25 +96,94 @@ module ValueTask =
9496
let r02 = ValueTask.map (mapping true) (x1 ())
9597
r02.AsTask().Exception.InnerExceptions |> areEquivalent [TestException "I was told to fail"]
9698

97-
let r03 = ValueTask.zip (e1 ()) (x2 ())
99+
let r03 = ValueTask.zipSequentially (e1 ()) (x2 ())
98100
r03.AsTask().Exception.InnerExceptions |> areEquivalent [TestException "Ouch, can't create: 1"]
99101

100-
let r04 = ValueTask.zip (e1 ()) (e2 ())
102+
let r04 = ValueTask.zipSequentially (e1 ()) (e2 ())
101103
r04.AsTask().Exception.InnerExceptions |> areEquivalent [TestException "Ouch, can't create: 1"]
102104

103-
let r05 = ValueTask.map2 (mapping2 false) (e1 ()) (x2 ())
105+
let r05 = ValueTask.lift2 (mapping2 false) (e1 ()) (x2 ())
104106
r05.AsTask().Exception.InnerExceptions |> areEquivalent [TestException "Ouch, can't create: 1"]
105107

106-
let r06 = ValueTask.map3 (mapping3 false) (e1 ()) (e2 ()) (e3 ())
108+
let r06 = ValueTask.lift3 (mapping3 false) (e1 ()) (e2 ()) (e3 ())
107109
r06.AsTask().Exception.InnerExceptions |> areEquivalent [TestException "Ouch, can't create: 1"]
108110

109-
let r07 = ValueTask.map3 (mapping3 false) (x1 ()) (e2 ()) (e3 ())
111+
let r07 = ValueTask.lift3 (mapping3 false) (x1 ()) (e2 ()) (e3 ())
110112
r07.AsTask().Exception.InnerExceptions |> areEquivalent [TestException "Ouch, can't create: 2"]
111113

112114
let r08 = ValueTask.bind (binding true) (e1 ())
113115
r08.AsTask().Exception.InnerExceptions |> areEquivalent [TestException "Ouch, can't create: 1"]
114116

115117
let r09 = ValueTask.bind (binding true) (x1 ())
116118
r09.AsTask().Exception.InnerExceptions |> areEquivalent [TestException "I was told to fail"]
119+
120+
121+
[<Test>]
122+
let testValueTaskZip () =
123+
let t1 = createValueTask true 0 1
124+
let t2 = createValueTask true 0 2
125+
let t3 = createValueTask true 0 3
126+
127+
let c = new CancellationToken true
128+
let t4 = ValueTask.FromCanceled<int> c
129+
130+
let t5 = createValueTask false 0 5
131+
let t6 = createValueTask false 0 6
132+
133+
let t12 = ValueTask.WhenAll [t1; t2]
134+
let t12t12 = ValueTask.WhenAll [t12; t12]
135+
let t33 = ValueTask.WhenAll [t3; t3]
136+
137+
ValueTask.WaitAny t12 |> ignore
138+
ValueTask.WaitAny t12t12 |> ignore
139+
ValueTask.WaitAny t33 |> ignore
140+
141+
let t12123 = ValueTask.zip3 t12t12 t33 t4
142+
let ac1 =
143+
try
144+
(t12123.AsTask ()).Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
145+
with e ->
146+
failwithf "Failure in testValueTaskZip. ValueTask status is %A . Exception is %A" (t12123.AsTask ()).Status e
147+
148+
CollectionAssert.AreEquivalent ([1; 2; 1; 2; 3], ac1, "ValueTask.zip(3) should add only non already existing exceptions.")
149+
150+
let t13 = ValueTask.zip3 (ValueTask.zip t1 t3) t4 (ValueTask.zip t5 t6)
151+
Assert.AreEqual (true, t13.IsFaulted, "ValueTask.zip(3) between a value, an exception and a cancellation -> exception wins.")
152+
let ac2 = (t13.AsTask ()).Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
153+
CollectionAssert.AreEquivalent ([1; 3], ac2, "ValueTask.zip between 2 exceptions => both exceptions returned, even after combining with cancellation and values.")
154+
155+
[<Test>]
156+
let testValueTaskZipAsync () =
157+
let t1 = createValueTask true 20 1
158+
let t2 = createValueTask true 10 2
159+
let t3 = createValueTask true 30 3
160+
161+
let c = new CancellationToken true
162+
let t4 = ValueTask.FromCanceled<int> c
163+
164+
let t5 = createValueTask false 20 5
165+
let t6 = createValueTask false 10 6
166+
167+
let t12 = ValueTask.WhenAll [t1; t2]
168+
let t12t12 = ValueTask.WhenAll [t12; t12]
169+
let t33 = ValueTask.WhenAll [t3; t3]
170+
171+
ValueTask.WaitAny t12 |> ignore
172+
ValueTask.WaitAny t12t12 |> ignore
173+
ValueTask.WaitAny t33 |> ignore
174+
175+
let t12123 = ValueTask.zip3 t12t12 t33 t4
176+
let ac1 =
177+
try
178+
(t12123.AsTask ()).Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
179+
with e ->
180+
failwithf "Failure in testValueTaskZipAsync. ValueTask status is %A . Exception is %A" (t12123.AsTask ()).Status e
181+
182+
CollectionAssert.AreEquivalent ([1; 2; 1; 2; 3], ac1, "ValueTask.zip(3)Async should add only non already existing exceptions.")
183+
184+
let t13 = ValueTask.zip3 (ValueTask.zip t1 t3) t4 (ValueTask.zip t5 t6)
185+
Assert.AreEqual (true, t13.IsFaulted, "ValueTask.zip(3)Async between a value, an exception and a cancellation -> exception wins.")
186+
let ac2 = (t13.AsTask ()).Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
187+
CollectionAssert.AreEquivalent ([1; 3], ac2, "ValueTask.zipAsync between 2 exceptions => both exceptions returned, even after combining with cancellation and values.")
117188

118189
#endif

0 commit comments

Comments
 (0)