Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions gitbook/parallelAsyncResult/ce.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
## ParallelAsyncResult Computation Expression

Namespace: `FsToolkit.ErrorHandling`

This CE operates on the same type as `asyncResult`, but it adds the `and!` operator for running workflows in parallel.

When running concurrent workflows, fail-fast semantics are used. If any sub-task returns an `Error`, then all other tasks are cancelled and only that error is returned. To instead collect all errors, use `parallelAsyncValidation`.


## Examples

### Example 1

Suppose we want to download 3 files.

Here is our simulated download function:

```fsharp
// string -> Async<Result<string, string>>
let downloadAsync stuff : Async<Result<string, string>> = async {
do! Async.Sleep 3_000
return Ok stuff
}
```

This workflow will download each item in sequence:

```fsharp
let downloadAllSequential = ayncResult {
let! x = downloadAsync (Ok "We")
let! y = downloadAsync (Ok "run")
let! z = downloadAsync (Ok "sequentially :(")
return sprintf "%s %s %s" x y z
}
```

It takes 9 seconds to complete.

However, using `parallelAsyncResult`, we can download all 3 concurrently:

```fsharp
// Async<Result<string, string>>
let downloadAll = parallelAsyncResult {
let! x = downloadAsync (Ok "We")
and! y = downloadAsync (Ok "run")
and! z = downloadAsync (Ok "concurrently!")
return sprintf "%s %s %s" x y z
}
```

This takes just 3 seconds.
53 changes: 53 additions & 0 deletions gitbook/parallelAsyncResult/map2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
## ParallelAsyncResult.map2

Namespace: `FsToolkit.ErrorHandling`

Function Signature:

```fsharp
('a -> 'b -> 'c) -> Async<Result<'a, 'd>> -> Async<Result<'b, 'd>>
-> Async<Result<'c, 'd>>
```

## Examples

Note: Many use-cases requiring `map2` operations can also be solved using [the `parallelAsyncResult` computation expression](../parallelAsyncResult/ce.md).

### Example 1

Given the functions

```fsharp
getFollowerIds : UserId -> Async<Result<UserId list, exn>>
createPost : CreatePostRequest -> Async<Result<PostId, exn>>
```

And the type

```fsharp
type NotifyNewPostRequest =
{ UserIds : UserId list
NewPostId : PostId }
static member Create userIds newPostsId =
{UserIds = userIds; NewPostId = newPostsId}
```

We can create a `NotifyNewPostRequest` using `ParallelAsyncResult.map2` as below:

```fsharp
let createPostAndGetNotifyRequest (req : CreatePostRequest) =
// Async<Result<UserId list, exn>>
let getFollowersResult = getFollowerIds req.UserId

// Async<Result<PostId, exn>>
let createPostResult = createPost req

// Async<Result<NotifyNewPostRequest, exn>>
let newPostRequestResult =
ParallelAsyncResult.map2
NotifyNewPostRequest.Create getFollowersResult createPostResult

// ...
```

This workflow will run the sub-tasks `getFollowersResult` and `createPostResult` concurrently, which can increase throughput.
17 changes: 17 additions & 0 deletions gitbook/parallelAsyncResult/map3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## ParallelAsyncResult.map3

Namespace: `FsToolkit.ErrorHandling`

Function Signature:

```fsharp
('a -> 'b -> 'c -> 'd)
-> Async<Result<'a, 'e>>
-> Async<Result<'b, 'e>>
-> Async<Result<'c, 'e>>
-> Async<Result<'d, 'e>>
```

## Examples

Note: Many use-cases requiring `map3` operations can also be solved using [the `parallelAsyncResult` computation expression](../parallelAsyncResult/ce.md).
37 changes: 37 additions & 0 deletions gitbook/parallelAsyncValidation/ce.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
## ParallelAsyncValidation Computation Expression

Namespace: `FsToolkit.ErrorHandling`

This CE operates in the same way as `asyncValidation`, except that the `and!` operator will run workflows in parallel.

Concurrent workflows are run with the same semantics as [`Async.Parallel`](https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Parallel).


## Examples

See [here](../validation/ce.md) for other validation-like examples

```fsharp
// Result<string, string> -> Async<Result<string, string>>
let downloadAsync stuff = async {
return stuff
}

// AsyncValidation<string, string>
let addResult = parallelAsyncValidation {
let! x = downloadAsync (Ok "I")
and! y = downloadAsync (Ok "am")
and! z = downloadAsync (Ok "concurrent!")
return sprintf "%s %s %s" x y z
}
// async { return Ok "I am concurrent!" }

// AsyncValidation<string, string>
let addResult = parallelAsyncValidation {
let! x = downloadAsync (Error "Am")
and! y = downloadAsync (Error "I")
and! z = downloadAsync (Error "concurrent?")
return sprintf "%s %s %s" x y z
}
// async { return Error [ "Am"; "I"; "concurrent?" ] }
```
14 changes: 14 additions & 0 deletions gitbook/parallelAsyncValidation/map2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## ParallelAsyncValidation.map2

Namespace: `FsToolkit.ErrorHandling`

Function Signature:

```fsharp
('a -> 'b -> 'c)
-> Async<Result<'a, 'd list>>
-> Async<Result<'b, 'd list>>
-> Async<Result<'c, 'd list>>
```

Like [ParallelAsyncResult.map2](../parallelAsyncResult/map2.md), but collects the errors from both arguments.
16 changes: 16 additions & 0 deletions gitbook/parallelAsyncValidation/map3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
## ParallelAsyncValidation.map3

Namespace: `FsToolkit.ErrorHandling`

Function Signature:

```
('a -> 'b -> 'c -> 'd)
-> Async<Result<'a, 'e list>>
-> Async<Result<'b, 'e list>>
-> Async<Result<'c, 'e list>>
-> Async<Result<'d, 'e list>>
```

Like [ParallelAsyncResult.map3](../parallelAsyncResult/map3.md), but collects the errors from all arguments.

89 changes: 89 additions & 0 deletions src/FsToolkit.ErrorHandling/Async.fs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,95 @@ module Async =
let inline zip (left: Async<'left>) (right: Async<'right>) : Async<'left * 'right> =
bind (fun l -> bind (fun r -> singleton (l, r)) right) left

/// <summary>
/// Executes two asyncs concurrently <see cref='M:Microsoft.FSharp.Control.FSharpAsync.Parallel``1'/> and returns a mapping of the values
/// </summary>
/// <param name="mapper">The function to apply to the values of the <c>Async</c> values.</param>
/// <param name="input1">The first <c>Async</c> to execute</param>
/// <param name="input2">The second <c>Async</c> to execute</param>
/// <returns>The transformed <c>Async</c> value.</returns>
let inline parallelMap2
([<InlineIfLambda>] mapper: 'input1 -> 'input2 -> 'output)
(input1: Async<'input1>)
(input2: Async<'input2>)
: Async<'output> =

#if FABLE_COMPILER && FABLE_COMPILER_PYTHON
Async.Parallel(
[|
map box input1
map box input2
|]
)
#else
Async.Parallel(
[|
map box input1
map box input2
|],
maxDegreeOfParallelism = 2
)
#endif
|> map (fun results ->
let a =
results[0]
|> unbox<'input1>

let b =
results[1]
|> unbox<'input2>

mapper a b
)

/// <summary>
/// Executes three asyncs concurrently <see cref='M:Microsoft.FSharp.Control.FSharpAsync.Parallel``1'/> and returns a mapping of the values
/// </summary>
/// <param name="mapper">The function to apply to the values of the <c>Async</c> values.</param>
/// <param name="input1">The first <c>Async</c> to execute</param>
/// <param name="input2">The second <c>Async</c> to execute</param>
/// <param name="input3">The third <c>Async</c> value to transform.</param>
/// <returns>The transformed <c>Async</c> value.</returns>
let inline parallelMap3
([<InlineIfLambda>] mapper: 'input1 -> 'input2 -> 'input3 -> 'output)
(input1: Async<'input1>)
(input2: Async<'input2>)
(input3: Async<'input3>)
: Async<'output> =
#if FABLE_COMPILER && FABLE_COMPILER_PYTHON
Async.Parallel(
[|
map box input1
map box input2
map box input3
|]
)
#else
Async.Parallel(
[|
map box input1
map box input2
map box input3
|],
maxDegreeOfParallelism = 3
)
#endif
|> map (fun results ->
let a =
results[0]
|> unbox<'input1>

let b =
results[1]
|> unbox<'input2>

let c =
results[2]
|> unbox<'input3>

mapper a b c
)

/// <summary>
/// Operators for working with the <c>Async</c> type.
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions src/FsToolkit.ErrorHandling/FsToolkit.ErrorHandling.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
<Compile Include="TaskResultOption.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
<Compile Include="TaskResultOptionCE.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
<Compile Include="TaskResultOptionOp.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
<Compile Include="ParallelAsyncResult.fs" />
<Compile Include="ParallelAsyncResultCE.fs" />
<Compile Include="ParallelAsyncValidation.fs" />
<Compile Include="ParallelAsyncValidationCE.fs" />
<Compile Include="List.fs" />
<Compile Include="Array.fs" />
<Compile Include="Seq.fs" />
Expand Down
88 changes: 88 additions & 0 deletions src/FsToolkit.ErrorHandling/ParallelAsyncResult.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
namespace FsToolkit.ErrorHandling

open System

[<RequireQualifiedAccess>]
module ParallelAsyncResult =

[<AutoOpen>]
module InternalHelpers =

type AsyncResultErrorException<'a>(value: 'a) =
inherit Exception()
member this.Value = value

let toBoxedAsync (input: Async<Result<'ok, 'error>>) : Async<obj> =
async {
match! input with
| Ok x -> return box x
| Error e -> return raise (AsyncResultErrorException<'error>(e))
}

/// <summary>
/// Transforms two AsyncResults in one that executes them concurrently and combines the results using the specified function.
/// If either AsyncResult resolves to an error, then the other is cancelled and only the first error is returned.
/// </summary>
/// <param name="mapper">The function to apply to the values of the AsyncResult values.</param>
/// <param name="input1">The first AsyncResult value to transform.</param>
/// <param name="input2">The second AsyncResult value to transform.</param>
/// <returns>The transformed AsyncResult value.</returns>
let inline map2
([<InlineIfLambda>] mapper: 'a -> 'b -> 'c)
(input1: Async<Result<'a, 'error>>)
(input2: Async<Result<'b, 'error>>)
: Async<Result<'c, 'error>> =
async {
try
return!
Async.parallelMap2
(fun a b ->
let a = unbox<'a> a
let b = unbox<'b> b
Ok(mapper a b)
)
(toBoxedAsync input1)
(toBoxedAsync input2)

with :? AsyncResultErrorException<'error> as exn ->
return Error exn.Value
}

/// <summary>
/// Transforms three AsyncResults in one that executes them concurrently and combines the results using the specified function.
/// If any AsyncResult resolves to an error, then the others are cancelled and only the first error is returned.
/// </summary>
/// <param name="mapper">The function to apply to the values of the AsyncResult values.</param>
/// <param name="input1">The first AsyncResult value to transform.</param>
/// <param name="input2">The second AsyncResult value to transform.</param>
/// <param name="input3">The third AsyncResult value to transform.</param>
/// <returns>The transformed AsyncResult value.</returns>
let inline map3
([<InlineIfLambda>] mapper: 'a -> 'b -> 'c -> 'd)
(input1: Async<Result<'a, 'error>>)
(input2: Async<Result<'b, 'error>>)
(input3: Async<Result<'c, 'error>>)
: Async<Result<'d, 'error>> =
async {
try
return!
Async.parallelMap3
(fun a b c ->
let a = unbox<'a> a
let b = unbox<'b> b
let c = unbox<'c> c
Ok(mapper a b c)
)
(toBoxedAsync input1)
(toBoxedAsync input2)
(toBoxedAsync input3)

with :? AsyncResultErrorException<'error> as exn ->
return Error exn.Value
}

let inline zip
(a: Async<Result<'a, 'error>>)
(b: Async<Result<'b, 'error>>)
: Async<Result<'a * 'b, 'error>> =
map2 (fun a b -> a, b) a b
Loading
Loading