Skip to content

Commit 9e157c5

Browse files
authored
Merge pull request kubernetes#127357 from lengrongfu/feat/add-chan-buffer
add resourceupdates.Update chan buffer
2 parents b071443 + ead64fb commit 9e157c5

File tree

2 files changed

+43
-42
lines changed

2 files changed

+43
-42
lines changed

pkg/kubelet/cm/devicemanager/manager.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package devicemanager
1818

1919
import (
2020
"context"
21+
goerrors "errors"
2122
"fmt"
2223
"os"
2324
"path/filepath"
@@ -157,7 +158,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi
157158
numaNodes: numaNodes,
158159
topologyAffinityStore: topologyAffinityStore,
159160
devicesToReuse: make(PodReusableDevices),
160-
update: make(chan resourceupdates.Update),
161+
update: make(chan resourceupdates.Update, 100),
161162
}
162163

163164
server, err := plugin.NewServer(socketPath, manager, manager)
@@ -309,8 +310,10 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
309310

310311
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
311312
if len(podsToUpdate) > 0 {
312-
m.update <- resourceupdates.Update{
313-
PodUIDs: podsToUpdate.UnsortedList(),
313+
select {
314+
case m.update <- resourceupdates.Update{PodUIDs: podsToUpdate.UnsortedList()}:
315+
default:
316+
klog.ErrorS(goerrors.New("device update channel is full"), "discard pods info", "podsToUpdate", podsToUpdate.UnsortedList())
314317
}
315318
}
316319
}

pkg/kubelet/cm/devicemanager/manager_test.go

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1970,18 +1970,17 @@ func TestFeatureGateResourceHealthStatus(t *testing.T) {
19701970
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
19711971
require.NoError(t, err, "err should be nil")
19721972
resourceName := "domain1.com/resource1"
1973-
existDevices := map[string]DeviceInstances{
1974-
resourceName: map[string]pluginapi.Device{
1975-
"dev1": {
1976-
ID: "dev1",
1977-
Health: pluginapi.Healthy,
1978-
},
1979-
"dev2": {
1980-
ID: "dev2",
1981-
Health: pluginapi.Unhealthy,
1982-
},
1983-
},
1973+
existDevices := map[string]DeviceInstances{}
1974+
resourceNameMap := make(map[string]pluginapi.Device)
1975+
deviceUpdateNumber, deviceUpdateChanBuffer := 200, 100
1976+
for i := 0; i < deviceUpdateNumber; i++ {
1977+
resourceNameMap[fmt.Sprintf("dev%d", i)] = pluginapi.Device{
1978+
ID: fmt.Sprintf("dev%d", i),
1979+
Health: pluginapi.Healthy,
1980+
}
19841981
}
1982+
existDevices[resourceName] = resourceNameMap
1983+
19851984
testManager := &ManagerImpl{
19861985
allDevices: ResourceDeviceInstances(existDevices),
19871986
endpoints: make(map[string]endpointInfo),
@@ -1990,43 +1989,42 @@ func TestFeatureGateResourceHealthStatus(t *testing.T) {
19901989
allocatedDevices: make(map[string]sets.Set[string]),
19911990
podDevices: newPodDevices(),
19921991
checkpointManager: ckm,
1993-
update: make(chan resourceupdates.Update),
1992+
update: make(chan resourceupdates.Update, deviceUpdateChanBuffer),
19941993
}
19951994

1996-
podID := "pod1"
1997-
contID := "con1"
1998-
devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}}
1995+
for i := 0; i < deviceUpdateNumber; i++ {
1996+
podID := fmt.Sprintf("pod%d", i)
1997+
contID := fmt.Sprintf("con%d", i)
1998+
devices := checkpoint.DevicesPerNUMA{0: []string{fmt.Sprintf("dev%d", i)}}
1999+
testManager.podDevices.insert(podID, contID, resourceName,
2000+
devices,
2001+
newContainerAllocateResponse(
2002+
withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}),
2003+
withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
2004+
),
2005+
)
2006+
}
19992007

2000-
testManager.podDevices.insert(podID, contID, resourceName,
2001-
devices,
2002-
newContainerAllocateResponse(
2003-
withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}),
2004-
withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
2005-
),
2006-
)
20072008
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, true)
20082009

2009-
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
2010-
{ID: "dev1", Health: pluginapi.Healthy},
2011-
{ID: "dev2", Health: pluginapi.Unhealthy},
2012-
})
2010+
for i := 0; i < deviceUpdateNumber; i++ {
2011+
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
2012+
{ID: "dev1", Health: pluginapi.Healthy},
2013+
})
2014+
}
20132015
// update chan no data
20142016
assert.Empty(t, testManager.update)
20152017

2016-
// update chan receive pod1
2017-
var wg sync.WaitGroup
2018-
go func() {
2019-
defer wg.Done()
2018+
// update device status, assume all device unhealthy.
2019+
for i := 0; i < deviceUpdateNumber; i++ {
2020+
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
2021+
{ID: fmt.Sprintf("dev%d", i), Health: pluginapi.Unhealthy},
2022+
})
2023+
}
2024+
for i := 0; i < deviceUpdateChanBuffer; i++ {
20202025
u := <-testManager.update
20212026
assert.Equal(t, resourceupdates.Update{
2022-
PodUIDs: []string{"pod1"},
2027+
PodUIDs: []string{fmt.Sprintf("pod%d", i)},
20232028
}, u)
2024-
}()
2025-
wg.Add(1)
2026-
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
2027-
{ID: "dev1", Health: pluginapi.Unhealthy},
2028-
{ID: "dev2", Health: pluginapi.Healthy},
2029-
})
2030-
wg.Wait()
2031-
2029+
}
20322030
}

0 commit comments

Comments
 (0)