Skip to content

Commit 411ab59

Browse files
authored
feat(collector): add configurable pending pod inclusion for kubernetes input (#3394)
1 parent 858ac0f commit 411ab59

File tree

1 file changed

+54
-15
lines changed

1 file changed

+54
-15
lines changed

collector/benthos/input/kubernetes.go

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ func kubernetesResourcesInputConfig() *service.ConfigSpec {
3838
service.NewStringField("label_selector").
3939
Description("Label selector applied to each list operation.").
4040
Optional(),
41+
service.NewBoolField("include_pending_pods").
42+
Description("Include pods in pending state (not all containers are running). Only applies when resource_type is 'pod'.").
43+
Default(false),
4144
)
4245
}
4346

@@ -51,11 +54,12 @@ func init() {
5154
}
5255

5356
type kubernetesResourcesInput struct {
54-
namespaces []string
55-
resourceType string
56-
labelSelector labels.Selector
57-
logger *service.Logger
58-
resources *service.Resources
57+
namespaces []string
58+
resourceType string
59+
labelSelector labels.Selector
60+
includePendingPods bool
61+
logger *service.Logger
62+
resources *service.Resources
5963

6064
manager manager.Manager
6165
client client.Client
@@ -81,6 +85,11 @@ func newKubernetesResourcesInput(conf *service.ParsedConfig, res *service.Resour
8185
return nil, err
8286
}
8387

88+
includePendingPods, err := conf.FieldBool("include_pending_pods")
89+
if err != nil {
90+
return nil, err
91+
}
92+
8493
// Normalize the namespaces to lowercase and deduplicate.
8594
namespaces = lo.Uniq(lo.Map(namespaces, func(s string, _ int) string { return strings.ToLower(s) }))
8695

@@ -132,13 +141,14 @@ func newKubernetesResourcesInput(conf *service.ParsedConfig, res *service.Resour
132141
client := mgr.GetClient()
133142

134143
return &kubernetesResourcesInput{
135-
namespaces: namespaces,
136-
labelSelector: selector,
137-
resourceType: resourceType,
138-
manager: mgr,
139-
client: client,
140-
logger: logger,
141-
resources: res,
144+
namespaces: namespaces,
145+
labelSelector: selector,
146+
resourceType: resourceType,
147+
includePendingPods: includePendingPods,
148+
manager: mgr,
149+
client: client,
150+
logger: logger,
151+
resources: res,
142152
}, nil
143153
}
144154

@@ -192,9 +202,38 @@ func (in *kubernetesResourcesInput) ReadBatch(ctx context.Context) (service.Mess
192202
}
193203

194204
for _, pod := range podList.Items {
195-
if !lo.EveryBy(pod.Status.ContainerStatuses, func(cs corev1.ContainerStatus) bool {
196-
return cs.State.Running != nil
197-
}) {
205+
// Filter pods based on their status and configuration.
206+
shouldInclude := false
207+
208+
// Most recently observed status of the pod. This data may not be up to date. Populated by the system.
209+
switch pod.Status.Phase {
210+
case corev1.PodRunning:
211+
shouldInclude = true
212+
case corev1.PodPending:
213+
shouldInclude = in.includePendingPods
214+
// Phase not set, check container statuses
215+
case "":
216+
in.logger.Warnf("pod %s has no phase", pod.Name)
217+
// If all containers are running, treat as running pod
218+
if lo.EveryBy(pod.Status.ContainerStatuses, func(cs corev1.ContainerStatus) bool {
219+
return cs.State.Running != nil
220+
}) {
221+
shouldInclude = true
222+
// If at least one container is running, treat as pending
223+
} else if lo.SomeBy(pod.Status.ContainerStatuses, func(cs corev1.ContainerStatus) bool {
224+
return cs.State.Running != nil
225+
}) {
226+
shouldInclude = in.includePendingPods
227+
} else {
228+
shouldInclude = false
229+
}
230+
default:
231+
in.logger.Warnf("pod %s has unknown phase", pod.Name)
232+
// Skip pods in other phases (Succeeded, Failed, Unknown)
233+
shouldInclude = false
234+
}
235+
236+
if !shouldInclude {
198237
continue
199238
}
200239

0 commit comments

Comments
 (0)