Skip to content

Commit c2679ad

Browse files
committed
fn: add ParSliceErrCollect
Add ParSliceErrCollect function to handle errors without halting parallel processing. ParSliceErrCollect is similar to ParSlice but allows processing to continue even when errors occur. Instead of halting on the first error, it collects all errors for later handling, providing more robust parallel execution.
1 parent e893dee commit c2679ad

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

fn/concurrency.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package fn
22

33
import (
44
"context"
5+
"fmt"
56
"runtime"
7+
"sync"
68

79
"golang.org/x/sync/errgroup"
810
)
@@ -32,3 +34,48 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error {
3234

3335
return errGroup.Wait()
3436
}
37+
38+
// ParSliceErrCollect can be used to execute a function on each element of a
39+
// slice in parallel. This function is fully blocking and will wait for all
40+
// goroutines to finish (subject to context cancellation/timeout). Any errors
41+
// will be collected and returned as a map of slice element index to error.
42+
// Active goroutines limited with number of CPU.
43+
func ParSliceErrCollect[V any](ctx context.Context, s []V,
44+
f ErrFunc[V]) (map[int]error, error) {
45+
46+
errGroup, ctx := errgroup.WithContext(ctx)
47+
errGroup.SetLimit(runtime.NumCPU())
48+
49+
var instanceErrorsMutex sync.Mutex
50+
instanceErrors := make(map[int]error, len(s))
51+
52+
for idx := range s {
53+
errGroup.Go(func() error {
54+
err := f(ctx, s[idx])
55+
if err != nil {
56+
instanceErrorsMutex.Lock()
57+
instanceErrors[idx] = err
58+
instanceErrorsMutex.Unlock()
59+
}
60+
61+
// Avoid returning an error here, as that would cancel
62+
// the errGroup and terminate all slice element
63+
// processing instances. Instead, collect the error and
64+
// return it later.
65+
return nil
66+
})
67+
}
68+
69+
// Now we will wait/block for all goroutines to finish.
70+
//
71+
// The goroutines that are executing in parallel should not return an
72+
// error, but the Wait call may return an error if the context is
73+
// canceled or timed out.
74+
err := errGroup.Wait()
75+
if err != nil {
76+
return nil, fmt.Errorf("failed to wait on error group in "+
77+
"ParSliceErrorCollect: %w", err)
78+
}
79+
80+
return instanceErrors, nil
81+
}

0 commit comments

Comments
 (0)