@@ -80,6 +80,16 @@ type GenericPLEG struct {
80
80
podCacheMutex sync.Mutex
81
81
// logger is used for contextual logging
82
82
logger klog.Logger
83
+ // watchConditions tracks pod watch conditions, guarded by watchConditionsLock
84
+ // watchConditions is a map of pod UID -> condition key -> condition
85
+ watchConditions map [types.UID ]map [string ]versionedWatchCondition
86
+ watchConditionsLock sync.Mutex
87
+ }
88
+
89
+ type versionedWatchCondition struct {
90
+ key string
91
+ condition WatchCondition
92
+ version uint32
83
93
}
84
94
85
95
// plegContainerState has a one-to-one mapping to the
@@ -125,13 +135,14 @@ func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChan
125
135
panic ("cache cannot be nil" )
126
136
}
127
137
return & GenericPLEG {
128
- logger : logger ,
129
- relistDuration : relistDuration ,
130
- runtime : runtime ,
131
- eventChannel : eventChannel ,
132
- podRecords : make (podRecords ),
133
- cache : cache ,
134
- clock : clock ,
138
+ logger : logger ,
139
+ relistDuration : relistDuration ,
140
+ runtime : runtime ,
141
+ eventChannel : eventChannel ,
142
+ podRecords : make (podRecords ),
143
+ cache : cache ,
144
+ clock : clock ,
145
+ watchConditions : make (map [types.UID ]map [string ]versionedWatchCondition ),
135
146
}
136
147
}
137
148
@@ -252,28 +263,29 @@ func (g *GenericPLEG) Relist() {
252
263
// update running pod and container count
253
264
updateRunningPodAndContainerMetrics (pods )
254
265
g .podRecords .setCurrent (pods )
266
+ g .cleanupOrphanedWatchConditions ()
267
+
268
+ needsReinspection := make (map [types.UID ]* kubecontainer.Pod )
255
269
256
- // Compare the old and the current pods, and generate events.
257
- eventsByPodID := map [types.UID ][]* PodLifecycleEvent {}
258
270
for pid := range g .podRecords {
271
+ // Compare the old and the current pods, and generate events.
259
272
oldPod := g .podRecords .getOld (pid )
260
273
pod := g .podRecords .getCurrent (pid )
261
274
// Get all containers in the old and the new pod.
262
275
allContainers := getContainersFromPods (oldPod , pod )
276
+ var events []* PodLifecycleEvent
263
277
for _ , container := range allContainers {
264
- events := computeEvents (g .logger , oldPod , pod , & container .ID )
265
- for _ , e := range events {
266
- updateEvents (eventsByPodID , e )
267
- }
278
+ containerEvents := computeEvents (g .logger , oldPod , pod , & container .ID )
279
+ events = append (events , containerEvents ... )
268
280
}
269
- }
270
281
271
- needsReinspection := make (map [types.UID ]* kubecontainer.Pod )
282
+ watchConditions := g .getPodWatchConditions (pid )
283
+ _ , reinspect := g .podsToReinspect [pid ]
272
284
273
- // If there are events associated with a pod, we should update the
274
- // podCache .
275
- for pid , events := range eventsByPodID {
276
- pod := g . podRecords . getCurrent ( pid )
285
+ if len ( events ) == 0 && len ( watchConditions ) == 0 && ! reinspect {
286
+ // Nothing else needed for this pod .
287
+ continue
288
+ }
277
289
278
290
// updateCache() will inspect the pod and update the cache. If an
279
291
// error occurs during the inspection, we want PLEG to retry again
@@ -284,25 +296,35 @@ func (g *GenericPLEG) Relist() {
284
296
// inspecting the pod and getting the PodStatus to update the cache
285
297
// serially may take a while. We should be aware of this and
286
298
// parallelize if needed.
287
- if err , updated := g .updateCache (ctx , pod , pid ); err != nil {
299
+ status , updated , err := g .updateCache (ctx , pod , pid )
300
+ if err != nil {
288
301
// Rely on updateCache calling GetPodStatus to log the actual error.
289
302
g .logger .V (4 ).Error (err , "PLEG: Ignoring events for pod" , "pod" , klog .KRef (pod .Namespace , pod .Name ))
290
303
291
304
// make sure we try to reinspect the pod during the next relisting
292
305
needsReinspection [pid ] = pod
293
306
294
307
continue
295
- } else {
296
- // this pod was in the list to reinspect and we did so because it had events, so remove it
297
- // from the list (we don't want the reinspection code below to inspect it a second time in
298
- // this relist execution)
299
- delete (g .podsToReinspect , pid )
300
- if utilfeature .DefaultFeatureGate .Enabled (features .EventedPLEG ) {
301
- if ! updated {
302
- continue
303
- }
308
+ } else if utilfeature .DefaultFeatureGate .Enabled (features .EventedPLEG ) {
309
+ if ! updated {
310
+ continue
311
+ }
312
+ }
313
+
314
+ var completedConditions []versionedWatchCondition
315
+ for _ , condition := range watchConditions {
316
+ if condition .condition (status ) {
317
+ // condition was met: add it to the list of completed conditions.
318
+ completedConditions = append (completedConditions , condition )
304
319
}
305
320
}
321
+ if len (completedConditions ) > 0 {
322
+ g .completeWatchConditions (pid , completedConditions )
323
+ // If at least 1 condition completed, emit a ConditionMet event to trigger a pod sync.
324
+ // We only emit 1 event even if multiple conditions are met, since SyncPod reevaluates
325
+ // all containers in the pod with the latest status.
326
+ events = append (events , & PodLifecycleEvent {ID : pid , Type : ConditionMet })
327
+ }
306
328
307
329
// Update the internal storage and send out the events.
308
330
g .podRecords .update (pid )
@@ -325,8 +347,6 @@ func (g *GenericPLEG) Relist() {
325
347
if events [i ].Type == ContainerDied {
326
348
// Fill up containerExitCode map for ContainerDied event when first time appeared
327
349
if len (containerExitCode ) == 0 && pod != nil {
328
- // Get updated podStatus
329
- status , err := g .cache .Get (pod .ID )
330
350
if err == nil {
331
351
for _ , containerStatus := range status .ContainerStatuses {
332
352
containerExitCode [containerStatus .ID .ID ] = containerStatus .ExitCode
@@ -342,18 +362,6 @@ func (g *GenericPLEG) Relist() {
342
362
}
343
363
}
344
364
345
- // reinspect any pods that failed inspection during the previous relist
346
- if len (g .podsToReinspect ) > 0 {
347
- g .logger .V (5 ).Info ("GenericPLEG: Reinspecting pods that previously failed inspection" )
348
- for pid , pod := range g .podsToReinspect {
349
- if err , _ := g .updateCache (ctx , pod , pid ); err != nil {
350
- // Rely on updateCache calling GetPodStatus to log the actual error.
351
- g .logger .V (5 ).Error (err , "PLEG: pod failed reinspection" , "pod" , klog .KRef (pod .Namespace , pod .Name ))
352
- needsReinspection [pid ] = pod
353
- }
354
- }
355
- }
356
-
357
365
// Update the cache timestamp. This needs to happen *after*
358
366
// all pods have been properly updated in the cache.
359
367
g .cache .UpdateTime (timestamp )
@@ -427,13 +435,13 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus)
427
435
// updateCache tries to update the pod status in the kubelet cache and returns true if the
428
436
// pod status was actually updated in the cache. It will return false if the pod status
429
437
// was ignored by the cache.
430
- func (g * GenericPLEG ) updateCache (ctx context.Context , pod * kubecontainer.Pod , pid types.UID ) (error , bool ) {
438
+ func (g * GenericPLEG ) updateCache (ctx context.Context , pod * kubecontainer.Pod , pid types.UID ) (* kubecontainer. PodStatus , bool , error ) {
431
439
if pod == nil {
432
440
// The pod is missing in the current relist. This means that
433
441
// the pod has no visible (active or inactive) containers.
434
442
g .logger .V (4 ).Info ("PLEG: Delete status for pod" , "podUID" , string (pid ))
435
443
g .cache .Delete (pid )
436
- return nil , true
444
+ return nil , true , nil
437
445
}
438
446
439
447
g .podCacheMutex .Lock ()
@@ -477,22 +485,90 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
477
485
timestamp = status .TimeStamp
478
486
}
479
487
480
- return err , g .cache .Set (pod .ID , status , err , timestamp )
488
+ return status , g .cache .Set (pod .ID , status , err , timestamp ), err
481
489
}
482
490
483
- func (g * GenericPLEG ) UpdateCache (pod * kubecontainer.Pod , pid types.UID ) (error , bool ) {
484
- ctx := context .Background ()
485
- if pod == nil {
486
- return fmt .Errorf ("pod cannot be nil" ), false
491
+ // SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch
492
+ // condition is met. The condition is keyed so it can be updated before the condition
493
+ // is met.
494
+ func (g * GenericPLEG ) SetPodWatchCondition (podUID types.UID , conditionKey string , condition WatchCondition ) {
495
+ g .watchConditionsLock .Lock ()
496
+ defer g .watchConditionsLock .Unlock ()
497
+
498
+ conditions , ok := g .watchConditions [podUID ]
499
+ if ! ok {
500
+ conditions = make (map [string ]versionedWatchCondition )
501
+ }
502
+
503
+ versioned , found := conditions [conditionKey ]
504
+ if found {
505
+ // Watch condition was already set. Increment its version & update the condition function.
506
+ versioned .version ++
507
+ versioned .condition = condition
508
+ conditions [conditionKey ] = versioned
509
+ } else {
510
+ conditions [conditionKey ] = versionedWatchCondition {
511
+ key : conditionKey ,
512
+ condition : condition ,
513
+ }
514
+ }
515
+
516
+ g .watchConditions [podUID ] = conditions
517
+ }
518
+
519
+ // getPodWatchConditions returns a list of the active watch conditions for the pod.
520
+ func (g * GenericPLEG ) getPodWatchConditions (podUID types.UID ) []versionedWatchCondition {
521
+ g .watchConditionsLock .Lock ()
522
+ defer g .watchConditionsLock .Unlock ()
523
+
524
+ podConditions , ok := g .watchConditions [podUID ]
525
+ if ! ok {
526
+ return nil
487
527
}
488
- return g .updateCache (ctx , pod , pid )
528
+
529
+ // Flatten the map into a list of conditions. This also serves to create a copy, so the lock can
530
+ // be released.
531
+ conditions := make ([]versionedWatchCondition , 0 , len (podConditions ))
532
+ for _ , condition := range podConditions {
533
+ conditions = append (conditions , condition )
534
+ }
535
+ return conditions
489
536
}
490
537
491
- func updateEvents (eventsByPodID map [types.UID ][]* PodLifecycleEvent , e * PodLifecycleEvent ) {
492
- if e == nil {
538
+ // completeWatchConditions removes the completed watch conditions, unless they have been updated
539
+ // since the condition was checked.
540
+ func (g * GenericPLEG ) completeWatchConditions (podUID types.UID , completedConditions []versionedWatchCondition ) {
541
+ g .watchConditionsLock .Lock ()
542
+ defer g .watchConditionsLock .Unlock ()
543
+
544
+ conditions , ok := g .watchConditions [podUID ]
545
+ if ! ok {
546
+ // Pod was deleted, nothing to do.
493
547
return
494
548
}
495
- eventsByPodID [e .ID ] = append (eventsByPodID [e .ID ], e )
549
+
550
+ for _ , completed := range completedConditions {
551
+ condition := conditions [completed .key ]
552
+ // Only clear the condition if it has not been updated.
553
+ if condition .version == completed .version {
554
+ delete (conditions , completed .key )
555
+ }
556
+ }
557
+ g .watchConditions [podUID ] = conditions
558
+ }
559
+
560
+ // cleanupOrphanedWatchConditions purges the watchConditions map of any pods that were removed from
561
+ // the pod records. Events are not emitted for removed pods.
562
+ func (g * GenericPLEG ) cleanupOrphanedWatchConditions () {
563
+ g .watchConditionsLock .Lock ()
564
+ defer g .watchConditionsLock .Unlock ()
565
+
566
+ for podUID := range g .watchConditions {
567
+ if g .podRecords .getCurrent (podUID ) == nil {
568
+ // Pod was deleted, remove it from the watch conditions.
569
+ delete (g .watchConditions , podUID )
570
+ }
571
+ }
496
572
}
497
573
498
574
func getContainerState (pod * kubecontainer.Pod , cid * kubecontainer.ContainerID ) plegContainerState {
0 commit comments