@@ -314,11 +314,15 @@ func (g *GenericPLEG) Relist() {
314
314
var completedConditions []versionedWatchCondition
315
315
for _ , condition := range watchConditions {
316
316
if condition .condition (status ) {
317
+ // condition was met: add it to the list of completed conditions.
317
318
completedConditions = append (completedConditions , condition )
318
319
}
319
320
}
320
321
if len (completedConditions ) > 0 {
321
322
g .completeWatchConditions (pid , completedConditions )
323
+ // If at least 1 condition completed, emit a ConditionMet event to trigger a pod sync.
324
+ // We only emit 1 event even if multiple conditions are met, since SyncPod reevaluates
325
+ // all containers in the pod with the latest status.
322
326
events = append (events , & PodLifecycleEvent {ID : pid , Type : ConditionMet })
323
327
}
324
328
@@ -484,24 +488,25 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
484
488
return status , g .cache .Set (pod .ID , status , err , timestamp ), err
485
489
}
486
490
491
+ // SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch
492
+ // condition is met. The condition is keyed so it can be updated before the condition
493
+ // is met.
487
494
func (g * GenericPLEG ) SetPodWatchCondition (podUID types.UID , conditionKey string , condition WatchCondition ) {
488
495
g .watchConditionsLock .Lock ()
489
496
defer g .watchConditionsLock .Unlock ()
490
497
491
498
conditions , ok := g .watchConditions [podUID ]
492
499
if ! ok {
493
- if condition == nil {
494
- return // Condition isn't set, nothing to do.
495
- }
496
500
conditions = make (map [string ]versionedWatchCondition )
497
501
}
498
502
499
503
versioned , found := conditions [conditionKey ]
500
504
if found {
505
+ // Watch condition was already set. Increment its version & update the condition function.
501
506
versioned .version ++
502
507
versioned .condition = condition
503
508
conditions [conditionKey ] = versioned
504
- } else if condition != nil {
509
+ } else {
505
510
conditions [conditionKey ] = versionedWatchCondition {
506
511
key : conditionKey ,
507
512
condition : condition ,
@@ -516,19 +521,22 @@ func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCo
516
521
g .watchConditionsLock .Lock ()
517
522
defer g .watchConditionsLock .Unlock ()
518
523
519
- conditions , ok := g .watchConditions [podUID ]
524
+ podConditions , ok := g .watchConditions [podUID ]
520
525
if ! ok {
521
526
return nil
522
527
}
523
528
524
- filtered := make ([]versionedWatchCondition , 0 , len (conditions ))
525
- for _ , condition := range conditions {
526
- filtered = append (filtered , condition )
529
+ // Flatten the map into a list of conditions. This also serves to create a copy, so the lock can
530
+ // be released.
531
+ conditions := make ([]versionedWatchCondition , 0 , len (podConditions ))
532
+ for _ , condition := range podConditions {
533
+ conditions = append (conditions , condition )
527
534
}
528
- return filtered
535
+ return conditions
529
536
}
530
537
531
- // completeWatchConditions clears the completed watch conditions.
538
+ // completeWatchConditions removes the completed watch conditions, unless they have been updated
539
+ // since the condition was checked.
532
540
func (g * GenericPLEG ) completeWatchConditions (podUID types.UID , completedConditions []versionedWatchCondition ) {
533
541
g .watchConditionsLock .Lock ()
534
542
defer g .watchConditionsLock .Unlock ()
@@ -549,6 +557,8 @@ func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditi
549
557
g .watchConditions [podUID ] = conditions
550
558
}
551
559
560
+ // cleanupOrphanedWatchConditions purges the watchConditions map of any pods that were removed from
561
+ // the pod records. Events are not emitted for removed pods.
552
562
func (g * GenericPLEG ) cleanupOrphanedWatchConditions () {
553
563
g .watchConditionsLock .Lock ()
554
564
defer g .watchConditionsLock .Unlock ()
0 commit comments