diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index f4f220f458..3ba2f3c133 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -112,8 +112,10 @@ func New(updater status.Updater, opts ...Option) (provider.Provider, error) { configs: make(map[types.NamespacedNameKind]adcConfig), parentRefs: make(map[types.NamespacedNameKind][]types.NamespacedNameKind), store: NewStore(), - executor: &DefaultADCExecutor{}, - updater: updater, + executor: &DefaultADCExecutor{ + Concurrency: 4, + }, + updater: updater, }, nil } diff --git a/internal/provider/adc/executor.go b/internal/provider/adc/executor.go index 5377018dc0..4a1cfc3e5c 100644 --- a/internal/provider/adc/executor.go +++ b/internal/provider/adc/executor.go @@ -42,6 +42,8 @@ type ADCExecutor interface { type DefaultADCExecutor struct { sync.Mutex + + Concurrency int } func (e *DefaultADCExecutor) Execute(ctx context.Context, mode string, config adcConfig, args []string) error { @@ -52,24 +54,46 @@ func (e *DefaultADCExecutor) Execute(ctx context.Context, mode string, config ad } func (e *DefaultADCExecutor) runADC(ctx context.Context, mode string, config adcConfig, args []string) error { - var execErrs = types.ADCExecutionError{ - Name: config.Name, - } - - for _, addr := range config.ServerAddrs { - if err := e.runForSingleServerWithTimeout(ctx, addr, mode, config, args); err != nil { - log.Errorw("failed to run adc for server", zap.String("server", addr), zap.Error(err)) - var execErr types.ADCExecutionServerAddrError - if errors.As(err, &execErr) { - execErrs.FailedErrors = append(execErrs.FailedErrors, execErr) - } else { - execErrs.FailedErrors = append(execErrs.FailedErrors, types.ADCExecutionServerAddrError{ - ServerAddr: addr, - Err: err.Error(), - }) + if e.Concurrency <= 0 { + e.Concurrency = 1 + } + + errCh := make(chan types.ADCExecutionServerAddrError, len(config.ServerAddrs)) + sem := make(chan struct{}, e.Concurrency) + var wg sync.WaitGroup + + for _, serverAddr := range config.ServerAddrs { + sem <- struct{}{} + wg.Add(1) + + go func(addr string) { + defer wg.Done() + defer func() { <-sem }() + + if err := e.runForSingleServerWithTimeout(ctx, addr, mode, config, args); err != nil { + log.Errorw("failed to run adc for server", zap.String("server", addr), zap.Error(err)) + var execErr types.ADCExecutionServerAddrError + if errors.As(err, &execErr) { + errCh <- execErr + } else { + errCh <- types.ADCExecutionServerAddrError{ + ServerAddr: addr, + Err: err.Error(), + } + } } - } + }(serverAddr) } + + wg.Wait() + close(errCh) + + var execErrs types.ADCExecutionError + execErrs.Name = config.Name + for err := range errCh { + execErrs.FailedErrors = append(execErrs.FailedErrors, err) + } + if len(execErrs.FailedErrors) > 0 { return execErrs }