Skip to content

Commit 20e76df

Browse files
check for k8s pod image ID existence (#570)
1 parent 070076d commit 20e76df

File tree

1 file changed

+47
-8
lines changed

1 file changed

+47
-8
lines changed

internal/kube/kube.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,27 @@ type K8SConnection struct {
3434
}
3535

3636
// NewPodData creates a PodData object from a k8s pod
37-
func NewPodData(pod *corev1.Pod) *PodData {
37+
func NewPodData(pod *corev1.Pod, logger *logger.Logger) (*PodData, error) {
3838
digests := make(map[string]string)
3939

4040
creationTimestamp := pod.GetObjectMeta().GetCreationTimestamp()
4141
owners := pod.GetObjectMeta().GetOwnerReferences()
4242
containers := pod.Status.ContainerStatuses
43+
4344
for _, cs := range containers {
44-
digests[cs.Image] = cs.ImageID[len(cs.ImageID)-64:]
45+
if cs.ImageID == "" {
46+
switch pod.Status.Phase {
47+
case corev1.PodFailed:
48+
// skip failed pods
49+
logger.Warn("skipping failed pod %s in namespace %s as it has containers without image IDs", pod.Name, pod.Namespace)
50+
return nil, nil
51+
case corev1.PodRunning:
52+
// fail
53+
return nil, fmt.Errorf("pod %s in namespace %s has containers without image IDs", pod.Name, pod.Namespace)
54+
}
55+
} else {
56+
digests[cs.Image] = cs.ImageID[len(cs.ImageID)-64:]
57+
}
4558
}
4659

4760
return &PodData{
@@ -50,7 +63,7 @@ func NewPodData(pod *corev1.Pod) *PodData {
5063
Digests: digests,
5164
CreationTimestamp: creationTimestamp.Unix(),
5265
Owners: owners,
53-
}
66+
}, nil
5467
}
5568

5669
// NewK8sClientSet creates a k8s clientset
@@ -94,7 +107,7 @@ func (clientset *K8SConnection) GetPodsData(filter *filters.ResourceFilterOption
94107
if err != nil {
95108
return podsData, fmt.Errorf("could not list pods on cluster scope: %v ", err)
96109
}
97-
return processPods(list), nil
110+
return processPods(list, logger)
98111
} else {
99112
list := &corev1.PodList{}
100113
filteredNamespaces, err := clientset.filterNamespaces(filter)
@@ -143,32 +156,58 @@ func (clientset *K8SConnection) GetPodsData(filter *filters.ResourceFilterOption
143156
return podsData, <-errs
144157
}
145158

146-
return processPods(list), nil
159+
return processPods(list, logger)
147160
}
148161
}
149162

150163
// processPods returns podData list for a list of Pods
151-
func processPods(list *corev1.PodList) []*PodData {
164+
func processPods(list *corev1.PodList, logger *logger.Logger) ([]*PodData, error) {
152165
podsData := []*PodData{}
153166
var (
154167
wg sync.WaitGroup
155168
mutex = &sync.Mutex{}
156169
)
170+
171+
errs := make(chan error, 1) // Buffered only for the first error
172+
ctx, cancel := context.WithCancel(context.Background())
173+
defer cancel() // Make sure it's called to release resources even if no errors
174+
157175
for _, pod := range list.Items {
158176
wg.Add(1)
159177
go func(pod corev1.Pod) {
160178
defer wg.Done()
179+
180+
// Check if any error occurred in any other goroutines:
181+
select {
182+
case <-ctx.Done():
183+
return // Error somewhere, terminate
184+
default: // Default is must to avoid blocking
185+
}
186+
161187
// only report running or failed pods
162188
if pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodFailed {
163-
data := NewPodData(&pod)
189+
data, err := NewPodData(&pod, logger)
190+
if err != nil {
191+
// Non-blocking send of error
192+
select {
193+
case errs <- err:
194+
default:
195+
}
196+
cancel() // send cancel signal to goroutines
197+
return
198+
}
164199
mutex.Lock()
165200
podsData = append(podsData, data)
166201
mutex.Unlock()
167202
}
168203
}(pod)
169204
}
170205
wg.Wait()
171-
return podsData
206+
// Return (first) error, if any:
207+
if ctx.Err() != nil {
208+
return podsData, <-errs
209+
}
210+
return podsData, nil
172211
}
173212

174213
// filterNamespaces filters a super set of namespaces by including or excluding a subset of namespaces using regex patterns.

0 commit comments

Comments
 (0)