@@ -80,6 +80,16 @@ type GenericPLEG struct {
80
80
podCacheMutex sync.Mutex
81
81
// logger is used for contextual logging
82
82
logger klog.Logger
83
+ // watchConditions tracks pod watch conditions, guarded by watchConditionsLock
84
+ // watchConditions is a map of pod UID -> condition key -> condition
85
+ watchConditions map [types.UID ]map [string ]versionedWatchCondition
86
+ watchConditionsLock sync.Mutex
87
+ }
88
+
89
+ type versionedWatchCondition struct {
90
+ key string
91
+ condition WatchCondition
92
+ version uint32
83
93
}
84
94
85
95
// plegContainerState has a one-to-one mapping to the
@@ -125,13 +135,14 @@ func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChan
125
135
panic ("cache cannot be nil" )
126
136
}
127
137
return & GenericPLEG {
128
- logger : logger ,
129
- relistDuration : relistDuration ,
130
- runtime : runtime ,
131
- eventChannel : eventChannel ,
132
- podRecords : make (podRecords ),
133
- cache : cache ,
134
- clock : clock ,
138
+ logger : logger ,
139
+ relistDuration : relistDuration ,
140
+ runtime : runtime ,
141
+ eventChannel : eventChannel ,
142
+ podRecords : make (podRecords ),
143
+ cache : cache ,
144
+ clock : clock ,
145
+ watchConditions : make (map [types.UID ]map [string ]versionedWatchCondition ),
135
146
}
136
147
}
137
148
@@ -252,6 +263,7 @@ func (g *GenericPLEG) Relist() {
252
263
// update running pod and container count
253
264
updateRunningPodAndContainerMetrics (pods )
254
265
g .podRecords .setCurrent (pods )
266
+ g .cleanupOrphanedWatchConditions ()
255
267
256
268
needsReinspection := make (map [types.UID ]* kubecontainer.Pod )
257
269
@@ -267,9 +279,10 @@ func (g *GenericPLEG) Relist() {
267
279
events = append (events , containerEvents ... )
268
280
}
269
281
282
+ watchConditions := g .getPodWatchConditions (pid )
270
283
_ , reinspect := g .podsToReinspect [pid ]
271
284
272
- if len (events ) == 0 && ! reinspect {
285
+ if len (events ) == 0 && len ( watchConditions ) == 0 && ! reinspect {
273
286
// Nothing else needed for this pod.
274
287
continue
275
288
}
@@ -283,7 +296,8 @@ func (g *GenericPLEG) Relist() {
283
296
// inspecting the pod and getting the PodStatus to update the cache
284
297
// serially may take a while. We should be aware of this and
285
298
// parallelize if needed.
286
- if err , updated := g .updateCache (ctx , pod , pid ); err != nil {
299
+ status , updated , err := g .updateCache (ctx , pod , pid )
300
+ if err != nil {
287
301
// Rely on updateCache calling GetPodStatus to log the actual error.
288
302
g .logger .V (4 ).Error (err , "PLEG: Ignoring events for pod" , "pod" , klog .KRef (pod .Namespace , pod .Name ))
289
303
@@ -299,6 +313,14 @@ func (g *GenericPLEG) Relist() {
299
313
}
300
314
}
301
315
316
+ var completedConditions []versionedWatchCondition
317
+ for _ , condition := range watchConditions {
318
+ if condition .condition (status ) {
319
+ completedConditions = append (completedConditions , condition )
320
+ }
321
+ }
322
+ g .completeWatchConditions (pid , completedConditions )
323
+
302
324
// Update the internal storage and send out the events.
303
325
g .podRecords .update (pid )
304
326
@@ -320,8 +342,6 @@ func (g *GenericPLEG) Relist() {
320
342
if events [i ].Type == ContainerDied {
321
343
// Fill up containerExitCode map for ContainerDied event when first time appeared
322
344
if len (containerExitCode ) == 0 && pod != nil {
323
- // Get updated podStatus
324
- status , err := g .cache .Get (pod .ID )
325
345
if err == nil {
326
346
for _ , containerStatus := range status .ContainerStatuses {
327
347
containerExitCode [containerStatus .ID .ID ] = containerStatus .ExitCode
@@ -410,13 +430,13 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus)
410
430
// updateCache tries to update the pod status in the kubelet cache and returns true if the
411
431
// pod status was actually updated in the cache. It will return false if the pod status
412
432
// was ignored by the cache.
413
- func (g * GenericPLEG ) updateCache (ctx context.Context , pod * kubecontainer.Pod , pid types.UID ) (error , bool ) {
433
+ func (g * GenericPLEG ) updateCache (ctx context.Context , pod * kubecontainer.Pod , pid types.UID ) (* kubecontainer. PodStatus , bool , error ) {
414
434
if pod == nil {
415
435
// The pod is missing in the current relist. This means that
416
436
// the pod has no visible (active or inactive) containers.
417
437
g .logger .V (4 ).Info ("PLEG: Delete status for pod" , "podUID" , string (pid ))
418
438
g .cache .Delete (pid )
419
- return nil , true
439
+ return nil , true , nil
420
440
}
421
441
422
442
g .podCacheMutex .Lock ()
@@ -460,22 +480,93 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
460
480
timestamp = status .TimeStamp
461
481
}
462
482
463
- return err , g .cache .Set (pod .ID , status , err , timestamp )
483
+ return status , g .cache .Set (pod .ID , status , err , timestamp ), err
464
484
}
465
485
466
486
func (g * GenericPLEG ) UpdateCache (pod * kubecontainer.Pod , pid types.UID ) (error , bool ) {
467
487
ctx := context .Background ()
468
488
if pod == nil {
469
489
return fmt .Errorf ("pod cannot be nil" ), false
470
490
}
471
- return g .updateCache (ctx , pod , pid )
491
+ _ , updated , err := g .updateCache (ctx , pod , pid )
492
+ return err , updated
493
+ }
494
+
495
+ func (g * GenericPLEG ) SetPodWatchCondition (podUID types.UID , conditionKey string , condition WatchCondition ) {
496
+ g .watchConditionsLock .Lock ()
497
+ defer g .watchConditionsLock .Unlock ()
498
+
499
+ conditions , ok := g .watchConditions [podUID ]
500
+ if ! ok {
501
+ if condition == nil {
502
+ return // Condition isn't set, nothing to do.
503
+ }
504
+ conditions = make (map [string ]versionedWatchCondition )
505
+ }
506
+
507
+ versioned , found := conditions [conditionKey ]
508
+ if found {
509
+ versioned .version ++
510
+ versioned .condition = condition
511
+ conditions [conditionKey ] = versioned
512
+ } else if condition != nil {
513
+ conditions [conditionKey ] = versionedWatchCondition {
514
+ key : conditionKey ,
515
+ condition : condition ,
516
+ }
517
+ }
518
+
519
+ g .watchConditions [podUID ] = conditions
472
520
}
473
521
474
- func updateEvents (eventsByPodID map [types.UID ][]* PodLifecycleEvent , e * PodLifecycleEvent ) {
475
- if e == nil {
522
+ // getPodWatchConditions returns a list of the active watch conditions for the pod.
523
+ func (g * GenericPLEG ) getPodWatchConditions (podUID types.UID ) []versionedWatchCondition {
524
+ g .watchConditionsLock .Lock ()
525
+ defer g .watchConditionsLock .Unlock ()
526
+
527
+ conditions , ok := g .watchConditions [podUID ]
528
+ if ! ok {
529
+ return nil
530
+ }
531
+
532
+ filtered := make ([]versionedWatchCondition , 0 , len (conditions ))
533
+ for _ , condition := range conditions {
534
+ filtered = append (filtered , condition )
535
+ }
536
+ return filtered
537
+ }
538
+
539
+ // completeWatchConditions clears the completed watch conditions.
540
+ func (g * GenericPLEG ) completeWatchConditions (podUID types.UID , completedConditions []versionedWatchCondition ) {
541
+ g .watchConditionsLock .Lock ()
542
+ defer g .watchConditionsLock .Unlock ()
543
+
544
+ conditions , ok := g .watchConditions [podUID ]
545
+ if ! ok {
546
+ // Pod was deleted, nothing to do.
476
547
return
477
548
}
478
- eventsByPodID [e .ID ] = append (eventsByPodID [e .ID ], e )
549
+
550
+ for _ , completed := range completedConditions {
551
+ condition := conditions [completed .key ]
552
+ // Only clear the condition if it has not been updated.
553
+ if condition .version == completed .version {
554
+ delete (conditions , completed .key )
555
+ }
556
+ }
557
+ g .watchConditions [podUID ] = conditions
558
+ }
559
+
560
+ func (g * GenericPLEG ) cleanupOrphanedWatchConditions () {
561
+ g .watchConditionsLock .Lock ()
562
+ defer g .watchConditionsLock .Unlock ()
563
+
564
+ for podUID := range g .watchConditions {
565
+ if g .podRecords .getCurrent (podUID ) == nil {
566
+ // Pod was deleted, remove it from the watch conditions.
567
+ delete (g .watchConditions , podUID )
568
+ }
569
+ }
479
570
}
480
571
481
572
func getContainerState (pod * kubecontainer.Pod , cid * kubecontainer.ContainerID ) plegContainerState {
0 commit comments