diff --git a/pkg/goanalysis/runner.go b/pkg/goanalysis/runner.go index 6cc0a7077109..16b4e5a50e83 100644 --- a/pkg/goanalysis/runner.go +++ b/pkg/goanalysis/runner.go @@ -257,19 +257,26 @@ func (r *runner) analyze(pkgs []*packages.Package, analyzers []*analysis.Analyze // Limit memory and IO usage. gomaxprocs := runtime.GOMAXPROCS(-1) debugf("Analyzing at most %d packages in parallel", gomaxprocs) + loadSem := make(chan struct{}, gomaxprocs) + stopChan := make(chan struct{}, 1) - var wg sync.WaitGroup debugf("There are %d initial and %d total packages", len(initialPkgs), len(loadingPackages)) + + var wg sync.WaitGroup + for _, lp := range loadingPackages { if lp.isInitial { wg.Add(1) + go func(lp *loadingPackage) { - lp.analyzeRecursive(r.loadMode, loadSem) + lp.analyzeRecursive(stopChan, r.loadMode, loadSem) + wg.Done() }(lp) } } + wg.Wait() return rootActions diff --git a/pkg/goanalysis/runner_action.go b/pkg/goanalysis/runner_action.go index 2c332d83e85f..855b769bcc99 100644 --- a/pkg/goanalysis/runner_action.go +++ b/pkg/goanalysis/runner_action.go @@ -28,10 +28,14 @@ func (actAlloc *actionAllocator) alloc() *action { return act } -func (act *action) waitUntilDependingAnalyzersWorked() { +func (act *action) waitUntilDependingAnalyzersWorked(stopChan chan struct{}) { for _, dep := range act.Deps { if dep.Package == act.Package { - <-dep.analysisDoneCh + select { + case <-stopChan: + return + case <-dep.analysisDoneCh: + } } } } diff --git a/pkg/goanalysis/runner_checker.go b/pkg/goanalysis/runner_checker.go index 8a2954cc0834..569002ed4b9f 100644 --- a/pkg/goanalysis/runner_checker.go +++ b/pkg/goanalysis/runner_checker.go @@ -95,7 +95,7 @@ func (act *action) analyze() { } } if depErrors != nil { - act.Err = errors.Join(depErrors, errors.New("failed prerequisites")) + act.Err = fmt.Errorf("failed prerequisites: %w", depErrors) return } diff --git a/pkg/goanalysis/runner_loadingpackage.go b/pkg/goanalysis/runner_loadingpackage.go index d22dbea30d72..cb0dcd000516 100644 --- a/pkg/goanalysis/runner_loadingpackage.go +++ b/pkg/goanalysis/runner_loadingpackage.go @@ -39,23 +39,28 @@ type loadingPackage struct { decUseMutex sync.Mutex } -func (lp *loadingPackage) analyzeRecursive(loadMode LoadMode, loadSem chan struct{}) { +func (lp *loadingPackage) analyzeRecursive(stopChan chan struct{}, loadMode LoadMode, loadSem chan struct{}) { lp.analyzeOnce.Do(func() { // Load the direct dependencies, in parallel. var wg sync.WaitGroup + wg.Add(len(lp.imports)) + for _, imp := range lp.imports { go func(imp *loadingPackage) { - imp.analyzeRecursive(loadMode, loadSem) + imp.analyzeRecursive(stopChan, loadMode, loadSem) + wg.Done() }(imp) } + wg.Wait() - lp.analyze(loadMode, loadSem) + + lp.analyze(stopChan, loadMode, loadSem) }) } -func (lp *loadingPackage) analyze(loadMode LoadMode, loadSem chan struct{}) { +func (lp *loadingPackage) analyze(stopChan chan struct{}, loadMode LoadMode, loadSem chan struct{}) { loadSem <- struct{}{} defer func() { <-loadSem @@ -66,26 +71,47 @@ func (lp *loadingPackage) analyze(loadMode LoadMode, loadSem chan struct{}) { if err := lp.loadWithFacts(loadMode); err != nil { werr := fmt.Errorf("failed to load package %s: %w", lp.pkg.Name, err) + // Don't need to write error to errCh, it will be extracted and reported on another layer. // Unblock depending on actions and propagate error. for _, act := range lp.actions { close(act.analysisDoneCh) + act.Err = werr } + return } var actsWg sync.WaitGroup + actsWg.Add(len(lp.actions)) + for _, act := range lp.actions { go func(act *action) { defer actsWg.Done() - act.waitUntilDependingAnalyzersWorked() + act.waitUntilDependingAnalyzersWorked(stopChan) + + select { + case <-stopChan: + return + default: + } act.analyzeSafe() + + select { + case <-stopChan: + return + default: + if act.Err != nil { + close(stopChan) + } + } }(act) } + actsWg.Wait() }