diff --git a/fn/concurrency.go b/fn/concurrency.go index 6c899989d..3a1cfef25 100644 --- a/fn/concurrency.go +++ b/fn/concurrency.go @@ -26,7 +26,12 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error { errGroup.SetLimit(runtime.GOMAXPROCS(0)) for _, v := range s { + // Snapshot v now so the goroutine sees the intended value + // even if the caller mutates the slice s later. This is a + // shallow copy only, if V contains pointers, their contents can + // still change. v := v + errGroup.Go(func() error { return f(ctx, v) }) @@ -43,39 +48,58 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error { func ParSliceErrCollect[V any](ctx context.Context, s []V, f ErrFunc[V]) (map[int]error, error) { - errGroup, ctx := errgroup.WithContext(ctx) + errGroup, groupCtx := errgroup.WithContext(ctx) errGroup.SetLimit(runtime.GOMAXPROCS(0)) - var instanceErrorsMutex sync.Mutex - instanceErrors := make(map[int]error, len(s)) + var ( + instanceErrorsMu sync.Mutex + instanceErrors = make(map[int]error, len(s)) + ) for idx := range s { + // Snapshot s[idx] now so the goroutine sees the intended value + // even if the caller mutates the slice later. This is a shallow + // copy only, if V contains pointers, their contents can still + // change. + v := s[idx] + errGroup.Go(func() error { - err := f(ctx, s[idx]) + // If already canceled, skip work without signaling an + // error. + select { + case <-groupCtx.Done(): + return nil + default: + } + + err := f(groupCtx, v) if err != nil { - instanceErrorsMutex.Lock() + instanceErrorsMu.Lock() instanceErrors[idx] = err - instanceErrorsMutex.Unlock() + instanceErrorsMu.Unlock() } - // Avoid returning an error here, as that would cancel - // the errGroup and terminate all slice element - // processing instances. Instead, collect the error and - // return it later. + // Do not return an error here. If we did, errGroup + // would cancel the context and stop all other element + // processors. Instead, record the error locally + // and return it after all goroutines finish. return nil }) } - // Now we will wait/block for all goroutines to finish. - // - // The goroutines that are executing in parallel should not return an - // error, but the Wait call may return an error if the context is - // canceled or timed out. - err := errGroup.Wait() - if err != nil { + // Wait for all goroutines to finish. In this design, goroutines do not + // return errors, so Wait should normally return nil. We handle context + // cancellation separately with an explicit ctx.Err() check. + if err := errGroup.Wait(); err != nil { return nil, fmt.Errorf("failed to wait on error group in "+ "ParSliceErrorCollect: %w", err) } + // If the caller's context was canceled or timed out, surface that. + // Return whatever per-item errors were collected before cancellation. + if err := ctx.Err(); err != nil { + return instanceErrors, err + } + return instanceErrors, nil }