Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions src/FsToolkit.ErrorHandling/Async.fs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,95 @@ module Async =
let inline zip (left: Async<'left>) (right: Async<'right>) : Async<'left * 'right> =
bind (fun l -> bind (fun r -> singleton (l, r)) right) left

/// <summary>
/// Executes two asyncs concurrently <see cref='M:Microsoft.FSharp.Control.FSharpAsync.Parallel``1'/> and returns a mapping of the values
/// </summary>
/// <param name="mapper">The function to apply to the values of the <c>Async</c> values.</param>
/// <param name="input1">The first <c>Async</c> to execute</param>
/// <param name="input2">The second <c>Async</c> to execute</param>
/// <returns>The transformed <c>Async</c> value.</returns>
let inline parallelMap2
([<InlineIfLambda>] mapper: 'input1 -> 'input2 -> 'output)
(input1: Async<'input1>)
(input2: Async<'input2>)
: Async<'output> =

#if FABLE_COMPILER && FABLE_COMPILER_PYTHON
Async.Parallel(
[|
map box input1
map box input2
|]
)
#else
Async.Parallel(
[|
map box input1
map box input2
|],
maxDegreeOfParallelism = 2
)
#endif
|> map (fun results ->
let a =
results[0]
|> unbox<'input1>

let b =
results[1]
|> unbox<'input2>

mapper a b
)

/// <summary>
/// Executes three asyncs concurrently <see cref='M:Microsoft.FSharp.Control.FSharpAsync.Parallel``1'/> and returns a mapping of the values
/// </summary>
/// <param name="mapper">The function to apply to the values of the <c>Async</c> values.</param>
/// <param name="input1">The first <c>Async</c> to execute</param>
/// <param name="input2">The second <c>Async</c> to execute</param>
/// <param name="input3">The third <c>Async</c> value to transform.</param>
/// <returns>The transformed <c>Async</c> value.</returns>
let inline parallelMap3
([<InlineIfLambda>] mapper: 'input1 -> 'input2 -> 'input3 -> 'output)
(input1: Async<'input1>)
(input2: Async<'input2>)
(input3: Async<'input3>)
: Async<'output> =
#if FABLE_COMPILER && FABLE_COMPILER_PYTHON
Async.Parallel(
[|
map box input1
map box input2
map box input3
|]
)
#else
Async.Parallel(
[|
map box input1
map box input2
map box input3
|],
maxDegreeOfParallelism = 3
)
#endif
|> map (fun results ->
let a =
results[0]
|> unbox<'input1>

let b =
results[1]
|> unbox<'input2>

let c =
results[2]
|> unbox<'input3>

mapper a b c
)

/// <summary>
/// Operators for working with the <c>Async</c> type.
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions src/FsToolkit.ErrorHandling/FsToolkit.ErrorHandling.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
<Compile Include="TaskResultOption.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
<Compile Include="TaskResultOptionCE.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
<Compile Include="TaskResultOptionOp.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
<Compile Include="ParallelAsyncResult.fs" />
<Compile Include="ParallelAsyncResultCE.fs" />
<Compile Include="ParallelAsyncValidation.fs" />
<Compile Include="ParallelAsyncValidationCE.fs" />
<Compile Include="List.fs" />
<Compile Include="Array.fs" />
<Compile Include="Seq.fs" />
Expand Down
88 changes: 88 additions & 0 deletions src/FsToolkit.ErrorHandling/ParallelAsyncResult.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
namespace FsToolkit.ErrorHandling

open System

[<RequireQualifiedAccess>]
module ParallelAsyncResult =

[<AutoOpen>]
module InternalHelpers =

type AsyncResultErrorException<'a>(value: 'a) =
inherit Exception()
member this.Value = value

let toBoxedAsync (input: Async<Result<'ok, 'error>>) : Async<obj> =
async {
match! input with
| Ok x -> return box x
| Error e -> return raise (AsyncResultErrorException<'error>(e))
}

/// <summary>
/// Transforms two AsyncResults in one that executes them concurrently and combines the results using the specified function.
/// If either AsyncResult resolves to an error, then the other is cancelled and only the first error is returned.
/// </summary>
/// <param name="mapper">The function to apply to the values of the AsyncResult values.</param>
/// <param name="input1">The first AsyncResult value to transform.</param>
/// <param name="input2">The second AsyncResult value to transform.</param>
/// <returns>The transformed AsyncResult value.</returns>
let inline map2
([<InlineIfLambda>] mapper: 'a -> 'b -> 'c)
(input1: Async<Result<'a, 'error>>)
(input2: Async<Result<'b, 'error>>)
: Async<Result<'c, 'error>> =
async {
try
return!
Async.parallelMap2
(fun a b ->
let a = unbox<'a> a
let b = unbox<'b> b
Ok(mapper a b)
)
(toBoxedAsync input1)
(toBoxedAsync input2)

with :? AsyncResultErrorException<'error> as exn ->
return Error exn.Value
}

/// <summary>
/// Transforms three AsyncResults in one that executes them concurrently and combines the results using the specified function.
/// If any AsyncResult resolves to an error, then the others are cancelled and only the first error is returned.
/// </summary>
/// <param name="mapper">The function to apply to the values of the AsyncResult values.</param>
/// <param name="input1">The first AsyncResult value to transform.</param>
/// <param name="input2">The second AsyncResult value to transform.</param>
/// <param name="input3">The third AsyncResult value to transform.</param>
/// <returns>The transformed AsyncResult value.</returns>
let inline map3
([<InlineIfLambda>] mapper: 'a -> 'b -> 'c -> 'd)
(input1: Async<Result<'a, 'error>>)
(input2: Async<Result<'b, 'error>>)
(input3: Async<Result<'c, 'error>>)
: Async<Result<'d, 'error>> =
async {
try
return!
Async.parallelMap3
(fun a b c ->
let a = unbox<'a> a
let b = unbox<'b> b
let c = unbox<'c> c
Ok(mapper a b c)
)
(toBoxedAsync input1)
(toBoxedAsync input2)
(toBoxedAsync input3)

with :? AsyncResultErrorException<'error> as exn ->
return Error exn.Value
}

let inline zip
(a: Async<Result<'a, 'error>>)
(b: Async<Result<'b, 'error>>)
: Async<Result<'a * 'b, 'error>> =
map2 (fun a b -> a, b) a b
200 changes: 200 additions & 0 deletions src/FsToolkit.ErrorHandling/ParallelAsyncResultCE.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
namespace FsToolkit.ErrorHandling

open System
open System.Threading.Tasks

[<AutoOpen>]
module ParallelAsyncResultCE =

type ParallelAsyncResultBuilder() =

member inline _.Return(value: 'ok) : Async<Result<'ok, 'error>> =
result.Return value
|> async.Return

member inline _.ReturnFrom(input: Async<Result<'ok, 'error>>) : Async<Result<'ok, 'error>> =
input

member inline _.Zero() : Async<Result<unit, 'error>> =
result.Zero()
|> async.Return

member inline _.Bind
(
asyncResult: Async<Result<'okInput, 'error>>,
[<InlineIfLambda>] binder: 'okInput -> Async<Result<'okOutput, 'error>>
) : Async<Result<'okOutput, 'error>> =
AsyncResult.bind binder asyncResult

member inline _.Delay
([<InlineIfLambda>] generator: unit -> Async<Result<'ok, 'error>>)
: Async<Result<'ok, 'error>> =
async.Delay generator

member inline this.Combine
(computation1: Async<Result<unit, 'error>>, computation2: Async<Result<'ok, 'error>>)
: Async<Result<'ok, 'error>> =
this.Bind(computation1, (fun () -> computation2))

member inline _.TryWith
(
computation: Async<Result<'ok, 'error>>,
[<InlineIfLambda>] handler: System.Exception -> Async<Result<'ok, 'error>>
) : Async<Result<'ok, 'error>> =
async.TryWith(computation, handler)

member inline _.TryFinally
(computation: Async<Result<'ok, 'error>>, [<InlineIfLambda>] compensation: unit -> unit)
: Async<Result<'ok, 'error>> =
async.TryFinally(computation, compensation)
#if !FABLE_COMPILER
member inline _.TryFinallyAsync
(
computation: Async<Result<'ok, 'error>>,
[<InlineIfLambda>] compensation: unit -> ValueTask
) : Async<Result<'ok, 'error>> =
let compensation =
async {
let vTask = compensation ()

if vTask.IsCompletedSuccessfully then
return ()
else
return!
vTask.AsTask()
|> Async.AwaitTask
}

Async.TryFinallyAsync(computation, compensation)


member inline this.Using
(
resource: 'ok :> IAsyncDisposable,
[<InlineIfLambda>] binder: 'ok -> Async<Result<'U, 'error>>
) : Async<Result<'U, 'error>> =
this.TryFinallyAsync(
binder resource,
(fun () ->
if not (isNull (box resource)) then
resource.DisposeAsync()
else
ValueTask()
)
)
#endif
member inline this.While
([<InlineIfLambda>] guard: unit -> bool, computation: Async<Result<unit, 'error>>)
: Async<Result<unit, 'error>> =
if guard () then
let mutable whileAsync = Unchecked.defaultof<_>

whileAsync <-
this.Bind(computation, (fun () -> if guard () then whileAsync else this.Zero()))

whileAsync
else
this.Zero()


member inline _.BindReturn
(x: Async<Result<'okInput, 'error>>, [<InlineIfLambda>] f: 'okInput -> 'okOutput)
: Async<Result<'okOutput, 'error>> =
AsyncResult.map f x

/// <summary>
/// Method lets us transform data types into our internal representation. This is the identity method to recognize the self type.
///
/// See https://stackoverflow.com/questions/35286541/why-would-you-use-builder-source-in-a-custom-computation-expression-builder
/// </summary>
member inline _.Source(result: Async<Result<'ok, 'error>>) : Async<Result<'ok, 'error>> =
result

member inline _.MergeSources(input1, input2) : Async<Result<'a * 'b, 'error>> =
ParallelAsyncResult.zip input1 input2

let parallelAsyncResult = ParallelAsyncResultBuilder()

[<AutoOpen>]
module MediumPriority =

type ParallelAsyncResultBuilder with

/// <summary>
/// Needed to allow `for..in` and `for..do` functionality
/// </summary>
member inline _.Source(s: #seq<'value>) : #seq<'value> = s

/// <summary>
/// Method lets us transform data types into our internal representation.
/// </summary>
member inline _.Source(result: Result<'ok, 'error>) : Async<Result<'ok, 'error>> =
Async.singleton result

/// <summary>
/// Method lets us transform data types into our internal representation.
/// </summary>
member inline _.Source(choice: Choice<'ok, 'error>) : Async<Result<'ok, 'error>> =
choice
|> Result.ofChoice
|> Async.singleton

/// <summary>
/// Method lets us transform data types into our internal representation.
/// </summary>
member inline _.Source(asyncComputation: Async<'ok>) : Async<Result<'ok, 'error>> =
asyncComputation
|> Async.map Ok


member inline _.Using
(
resource: 'ok :> IDisposable,
[<InlineIfLambda>] binder: 'ok -> Async<Result<'U, 'error>>
) : Async<Result<'U, 'error>> =
async.Using(resource, binder)


member inline this.For
(sequence: #seq<'ok>, [<InlineIfLambda>] binder: 'ok -> Async<Result<unit, 'error>>)
: Async<Result<unit, 'error>> =
this.Using(
sequence.GetEnumerator(),
fun enum ->
this.While(
(fun () -> enum.MoveNext()),
this.Delay(fun () -> binder enum.Current)
)
)

#if !FABLE_COMPILER
/// <summary>
/// Method lets us transform data types into our internal representation.
/// </summary>
member inline _.Source(task: Task<'ok>) : Async<Result<'ok, 'error>> =
task
|> Async.AwaitTask
|> Async.map Ok

/// <summary>
/// Method lets us transform data types into our internal representation.
/// </summary>
member inline _.Source(task: Task) : Async<Result<unit, 'error>> =
task
|> Async.AwaitTask
|> Async.map Ok
#endif

#if !FABLE_COMPILER
[<AutoOpen>]
module HighPriority =

type ParallelAsyncResultBuilder with

/// <summary>
/// Method lets us transform data types into our internal representation.
/// </summary>
member inline _.Source(task: Task<Result<'ok, 'error>>) : Async<Result<'ok, 'error>> =
task
|> Async.AwaitTask
#endif
Loading
Loading