|
7 | 7 |
|
8 | 8 | "github.com/kosli-dev/cli/internal/filters" |
9 | 9 | "github.com/kosli-dev/cli/internal/logger" |
| 10 | + "golang.org/x/sync/errgroup" |
10 | 11 | corev1 "k8s.io/api/core/v1" |
11 | 12 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
12 | 13 | "k8s.io/client-go/kubernetes" |
@@ -104,43 +105,23 @@ func (clientset *K8SConnection) GetPodsData(filter *filters.ResourceFilterOption |
104 | 105 |
|
105 | 106 | logger.Info("scanning the following namespaces: %v ", filteredNamespaces) |
106 | 107 |
|
107 | | - // run concurrently |
108 | | - errs := make(chan error, 1) // Buffered only for the first error |
109 | | - ctx, cancel := context.WithCancel(context.Background()) |
110 | | - defer cancel() // Make sure it's called to release resources even if no errors |
| 108 | + g, _ := errgroup.WithContext(context.Background()) |
111 | 109 |
|
112 | 110 | for _, ns := range filteredNamespaces { |
113 | | - wg.Add(1) |
114 | | - go func(ns string) { |
115 | | - defer wg.Done() |
116 | | - // Check if any error occurred in any other gorouties: |
117 | | - select { |
118 | | - case <-ctx.Done(): |
119 | | - return // Error somewhere, terminate |
120 | | - default: // Default is must to avoid blocking |
121 | | - } |
122 | | - |
| 111 | + g.Go(func() error { |
123 | 112 | pods, err := clientset.getPodsInNamespace(ns) |
124 | 113 | if err != nil { |
125 | | - // Non-blocking send of error |
126 | | - select { |
127 | | - case errs <- err: |
128 | | - default: |
129 | | - } |
130 | | - cancel() // send cancel signal to goroutines |
131 | | - return |
| 114 | + return err |
132 | 115 | } |
133 | 116 | mutex.Lock() |
134 | 117 | list.Items = append(list.Items, pods...) |
135 | 118 | mutex.Unlock() |
136 | | - |
137 | | - }(ns) |
| 119 | + return nil |
| 120 | + }) |
138 | 121 | } |
139 | 122 |
|
140 | | - wg.Wait() |
141 | | - // Return (first) error, if any: |
142 | | - if ctx.Err() != nil { |
143 | | - return podsData, <-errs |
| 123 | + if err := g.Wait(); err != nil { |
| 124 | + return nil, err |
144 | 125 | } |
145 | 126 |
|
146 | 127 | return processPods(list), nil |
@@ -178,63 +159,43 @@ func (clientset *K8SConnection) filterNamespaces(filter *filters.ResourceFilterO |
178 | 159 | return filter.IncludeNames, nil |
179 | 160 | } |
180 | 161 | } |
181 | | - result := []string{} |
182 | 162 | // get all namespaces in the cluster |
183 | 163 | nsList, err := clientset.GetClusterNamespaces() |
184 | 164 | if err != nil { |
185 | | - return result, err |
| 165 | + return nil, err |
186 | 166 | } |
187 | 167 |
|
| 168 | + namespaces := []string{} |
| 169 | + |
188 | 170 | if len(filter.IncludeNames) == 0 && len(filter.IncludeNamesRegex) == 0 && |
189 | 171 | len(filter.ExcludeNames) == 0 && len(filter.ExcludeNamesRegex) == 0 { |
190 | 172 | for _, ns := range nsList { |
191 | | - result = append(result, ns.Name) |
| 173 | + namespaces = append(namespaces, ns.Name) |
192 | 174 | } |
193 | | - return result, nil |
| 175 | + return namespaces, nil |
194 | 176 | } |
195 | 177 |
|
196 | | - var ( |
197 | | - wg sync.WaitGroup |
198 | | - mutex = &sync.Mutex{} |
199 | | - ) |
200 | | - |
201 | | - errs := make(chan error, 1) // Buffered only for the first error |
202 | | - ctx, cancel := context.WithCancel(context.Background()) |
203 | | - defer cancel() // Make sure it's called to release resources even if no errors |
| 178 | + mutex := new(sync.Mutex) |
| 179 | + g, _ := errgroup.WithContext(context.Background()) |
204 | 180 |
|
205 | 181 | for _, ns := range nsList { |
206 | | - wg.Add(1) |
207 | | - go func(ns string) { |
208 | | - defer wg.Done() |
209 | | - |
210 | | - // Check if any error occurred in any other gorouties: |
211 | | - select { |
212 | | - case <-ctx.Done(): |
213 | | - return // Error somewhere, terminate |
214 | | - default: // Default is must to avoid blocking |
215 | | - } |
216 | | - |
217 | | - include, err := filter.ShouldInclude(ns) |
| 182 | + g.Go(func() error { |
| 183 | + include, err := filter.ShouldInclude(ns.Name) |
218 | 184 | if err != nil { |
219 | | - select { |
220 | | - case errs <- err: |
221 | | - default: |
222 | | - } |
223 | | - cancel() // send cancel signal to goroutines |
224 | | - return |
| 185 | + return err |
225 | 186 | } |
226 | 187 | if include { |
227 | 188 | mutex.Lock() |
228 | | - result = append(result, ns) |
| 189 | + namespaces = append(namespaces, ns.Name) |
229 | 190 | mutex.Unlock() |
230 | 191 | } |
231 | | - }(ns.Name) |
| 192 | + return nil |
| 193 | + }) |
232 | 194 | } |
233 | | - wg.Wait() |
234 | | - if ctx.Err() != nil { |
235 | | - return result, <-errs |
| 195 | + if err := g.Wait(); err != nil { |
| 196 | + return nil, err |
236 | 197 | } |
237 | | - return result, nil |
| 198 | + return namespaces, nil |
238 | 199 | } |
239 | 200 |
|
240 | 201 | // getPodsInNamespace get pods in a specific namespace in a cluster |
|
0 commit comments