@@ -200,6 +200,8 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
200
200
if m .policy .Name () == string (PolicyNone ) {
201
201
return nil
202
202
}
203
+ // Periodically call m.reconcileState() to continue to keep the CPU sets of
204
+ // all pods in sync with and guaranteed CPUs handed out among them.
203
205
go wait .Until (func () { m .reconcileState () }, m .reconcilePeriod , wait .NeverStop )
204
206
return nil
205
207
}
@@ -217,19 +219,24 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e
217
219
}
218
220
}
219
221
}
222
+
223
+ // Call down into the policy to assign this container CPUs if required.
220
224
err := m .policyAddContainer (p , c , containerID )
221
225
if err != nil {
222
226
klog .Errorf ("[cpumanager] AddContainer error: %v" , err )
223
227
m .Unlock ()
224
228
return err
225
229
}
230
+
231
+ // Get the CPUs just assigned to the container (or fall back to the default
232
+ // CPUSet if none were assigned).
226
233
cpus := m .state .GetCPUSetOrDefault (string (p .UID ), c .Name )
227
234
m .Unlock ()
228
235
229
236
if ! cpus .IsEmpty () {
230
237
err = m .updateContainerCPUSet (containerID , cpus )
231
238
if err != nil {
232
- klog .Errorf ("[cpumanager] AddContainer error: %v" , err )
239
+ klog .Errorf ("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)" , p . Name , c . Name , containerID , err )
233
240
m .Lock ()
234
241
err := m .policyRemoveContainerByID (containerID )
235
242
if err != nil {
@@ -353,43 +360,59 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
353
360
354
361
m .removeStaleState ()
355
362
for _ , pod := range m .activePods () {
363
+ pstatus , ok := m .podStatusProvider .GetPodStatus (pod .UID )
364
+ if ! ok {
365
+ klog .Warningf ("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)" , pod .Name )
366
+ failure = append (failure , reconciledContainer {pod .Name , "" , "" })
367
+ continue
368
+ }
369
+
356
370
allContainers := pod .Spec .InitContainers
357
371
allContainers = append (allContainers , pod .Spec .Containers ... )
358
- status , ok := m .podStatusProvider .GetPodStatus (pod .UID )
359
372
for _ , container := range allContainers {
360
- if ! ok {
361
- klog .Warningf ("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)" , pod .Name )
373
+ containerID , err := findContainerIDByName (& pstatus , container .Name )
374
+ if err != nil {
375
+ klog .Warningf ("[cpumanager] reconcileState: skipping container; ID not found in pod status (pod: %s, container: %s, error: %v)" , pod .Name , container .Name , err )
362
376
failure = append (failure , reconciledContainer {pod .Name , container .Name , "" })
363
- break
377
+ continue
364
378
}
365
379
366
- containerID , err := findContainerIDByName ( & status , container .Name )
380
+ cstatus , err := findContainerStatusByName ( & pstatus , container .Name )
367
381
if err != nil {
368
- klog .Warningf ("[cpumanager] reconcileState: skipping container; ID not found in status (pod: %s, container: %s, error: %v)" , pod .Name , container .Name , err )
382
+ klog .Warningf ("[cpumanager] reconcileState: skipping container; container status not found in pod status (pod: %s, container: %s, error: %v)" , pod .Name , container .Name , err )
369
383
failure = append (failure , reconciledContainer {pod .Name , container .Name , "" })
370
384
continue
371
385
}
372
386
373
- // Check whether container is present in state, there may be 3 reasons why it's not present:
374
- // - policy does not want to track the container
375
- // - kubelet has just been restarted - and there is no previous state file
376
- // - container has been removed from state by RemoveContainer call (DeletionTimestamp is set)
377
- if _ , ok := m .state .GetCPUSet (string (pod .UID ), container .Name ); ! ok {
378
- if status .Phase == v1 .PodRunning && pod .DeletionTimestamp == nil {
379
- klog .V (4 ).Infof ("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)" , pod .Name , container .Name , containerID )
380
- err := m .AddContainer (pod , & container , containerID )
387
+ if cstatus .State .Waiting != nil ||
388
+ (cstatus .State .Waiting == nil && cstatus .State .Running == nil && cstatus .State .Terminated == nil ) {
389
+ klog .Warningf ("[cpumanager] reconcileState: skipping container; container still in the waiting state (pod: %s, container: %s, error: %v)" , pod .Name , container .Name , err )
390
+ failure = append (failure , reconciledContainer {pod .Name , container .Name , "" })
391
+ continue
392
+ }
393
+
394
+ if cstatus .State .Terminated != nil {
395
+ // Since the container is terminated, we know it is safe to
396
+ // remove it without any reconciliation. Removing the container
397
+ // will also remove it from the `containerMap` so that this
398
+ // container will be skipped next time around the loop.
399
+ _ , _ , err := m .containerMap .GetContainerRef (containerID )
400
+ if err == nil {
401
+ klog .Warningf ("[cpumanager] reconcileState: skipping container; already terminated (pod: %s, container id: %s)" , pod .Name , containerID )
402
+ err := m .RemoveContainer (containerID )
381
403
if err != nil {
382
- klog .Errorf ("[cpumanager] reconcileState: failed to add container (pod: %s, container: %s, container id: %s, error: %v)" , pod . Name , container .Name , containerID , err )
404
+ klog .Errorf ("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)" , pod .Name , containerID , err )
383
405
failure = append (failure , reconciledContainer {pod .Name , container .Name , containerID })
384
- continue
385
406
}
386
- } else {
387
- // if DeletionTimestamp is set, pod has already been removed from state
388
- // skip the pod/container since it's not running and will be deleted soon
389
- continue
390
407
}
408
+ continue
391
409
}
392
410
411
+ // Once we make it here we know we have a running container.
412
+ // Idempotently add it to the containerMap incase it is missing.
413
+ // This can happen after a kubelet restart, for example.
414
+ m .containerMap .Add (string (pod .UID ), container .Name , containerID )
415
+
393
416
cset := m .state .GetCPUSetOrDefault (string (pod .UID ), container .Name )
394
417
if cset .IsEmpty () {
395
418
// NOTE: This should not happen outside of tests.
@@ -427,6 +450,15 @@ func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
427
450
return "" , fmt .Errorf ("unable to find ID for container with name %v in pod status (it may not be running)" , name )
428
451
}
429
452
453
+ func findContainerStatusByName (status * v1.PodStatus , name string ) (* v1.ContainerStatus , error ) {
454
+ for _ , status := range append (status .InitContainerStatuses , status .ContainerStatuses ... ) {
455
+ if status .Name == name {
456
+ return & status , nil
457
+ }
458
+ }
459
+ return nil , fmt .Errorf ("unable to find status for container with name %v in pod status (it may not be running)" , name )
460
+ }
461
+
430
462
func (m * manager ) updateContainerCPUSet (containerID string , cpus cpuset.CPUSet ) error {
431
463
// TODO: Consider adding a `ResourceConfigForContainer` helper in
432
464
// helpers_linux.go similar to what exists for pods.
0 commit comments