Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions fn/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error {
errGroup.SetLimit(runtime.GOMAXPROCS(0))

for _, v := range s {
// Snapshot v now so the goroutine sees the intended value
// even if the caller mutates the slice s later. This is a
// shallow copy only, if V contains pointers, their contents can
// still change.
v := v

errGroup.Go(func() error {
return f(ctx, v)
})
Expand All @@ -43,39 +48,58 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error {
func ParSliceErrCollect[V any](ctx context.Context, s []V,
f ErrFunc[V]) (map[int]error, error) {

errGroup, ctx := errgroup.WithContext(ctx)
errGroup, groupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(runtime.GOMAXPROCS(0))

var instanceErrorsMutex sync.Mutex
instanceErrors := make(map[int]error, len(s))
var (
instanceErrorsMu sync.Mutex
instanceErrors = make(map[int]error, len(s))
)

for idx := range s {
// Snapshot s[idx] now so the goroutine sees the intended value
// even if the caller mutates the slice later. This is a shallow
// copy only, if V contains pointers, their contents can still
// change.
v := s[idx]

errGroup.Go(func() error {
err := f(ctx, s[idx])
// If already canceled, skip work without signaling an
// error.
select {
case <-groupCtx.Done():
return nil
default:
}

err := f(groupCtx, v)
if err != nil {
instanceErrorsMutex.Lock()
instanceErrorsMu.Lock()
instanceErrors[idx] = err
instanceErrorsMutex.Unlock()
instanceErrorsMu.Unlock()
}

// Avoid returning an error here, as that would cancel
// the errGroup and terminate all slice element
// processing instances. Instead, collect the error and
// return it later.
// Do not return an error here. If we did, errGroup
// would cancel the context and stop all other element
// processors. Instead, record the error locally
// and return it after all goroutines finish.
return nil
})
}

// Now we will wait/block for all goroutines to finish.
//
// The goroutines that are executing in parallel should not return an
// error, but the Wait call may return an error if the context is
// canceled or timed out.
err := errGroup.Wait()
if err != nil {
// Wait for all goroutines to finish. In this design, goroutines do not
// return errors, so Wait should normally return nil. We handle context
// cancellation separately with an explicit ctx.Err() check.
if err := errGroup.Wait(); err != nil {
return nil, fmt.Errorf("failed to wait on error group in "+
"ParSliceErrorCollect: %w", err)
}

// If the caller's context was canceled or timed out, surface that.
// Return whatever per-item errors were collected before cancellation.
if err := ctx.Err(); err != nil {
return instanceErrors, err
}

return instanceErrors, nil
}
Loading