Skip to content

Commit efb56da

Browse files
authored
Merge pull request kubernetes#92665 from klueska/upstream-add-get-preferred-allocation-api
Add GetPreferredAllocation() call to the v1beta1 device plugin API
2 parents d3aafb2 + 5bd0db0 commit efb56da

File tree

10 files changed

+1803
-452
lines changed

10 files changed

+1803
-452
lines changed

pkg/kubelet/cm/devicemanager/device_plugin_stub.go

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ import (
3333

3434
// Stub implementation for DevicePlugin.
3535
type Stub struct {
36-
devs []*pluginapi.Device
37-
socket string
38-
resourceName string
39-
preStartContainerFlag bool
36+
devs []*pluginapi.Device
37+
socket string
38+
resourceName string
39+
preStartContainerFlag bool
40+
getPreferredAllocationFlag bool
4041

4142
stop chan interface{}
4243
wg sync.WaitGroup
@@ -47,12 +48,24 @@ type Stub struct {
4748
// allocFunc is used for handling allocation request
4849
allocFunc stubAllocFunc
4950

51+
// getPreferredAllocFunc is used for handling getPreferredAllocation request
52+
getPreferredAllocFunc stubGetPreferredAllocFunc
53+
5054
registrationStatus chan watcherapi.RegistrationStatus // for testing
5155
endpoint string // for testing
5256

5357
}
5458

55-
// stubAllocFunc is the function called when receive an allocation request from Kubelet
59+
// stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet
60+
type stubGetPreferredAllocFunc func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error)
61+
62+
func defaultGetPreferredAllocFunc(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
63+
var response pluginapi.PreferredAllocationResponse
64+
65+
return &response, nil
66+
}
67+
68+
// stubAllocFunc is the function called when an allocation request is received from Kubelet
5669
type stubAllocFunc func(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error)
5770

5871
func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) {
@@ -62,20 +75,27 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De
6275
}
6376

6477
// NewDevicePluginStub returns an initialized DevicePlugin Stub.
65-
func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool) *Stub {
78+
func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub {
6679
return &Stub{
67-
devs: devs,
68-
socket: socket,
69-
resourceName: name,
70-
preStartContainerFlag: preStartContainerFlag,
80+
devs: devs,
81+
socket: socket,
82+
resourceName: name,
83+
preStartContainerFlag: preStartContainerFlag,
84+
getPreferredAllocationFlag: getPreferredAllocationFlag,
7185

7286
stop: make(chan interface{}),
7387
update: make(chan []*pluginapi.Device),
7488

75-
allocFunc: defaultAllocFunc,
89+
allocFunc: defaultAllocFunc,
90+
getPreferredAllocFunc: defaultGetPreferredAllocFunc,
7691
}
7792
}
7893

94+
// SetGetPreferredAllocFunc sets allocFunc of the device plugin
95+
func (m *Stub) SetGetPreferredAllocFunc(f stubGetPreferredAllocFunc) {
96+
m.getPreferredAllocFunc = f
97+
}
98+
7999
// SetAllocFunc sets allocFunc of the device plugin
80100
func (m *Stub) SetAllocFunc(f stubAllocFunc) {
81101
m.allocFunc = f
@@ -174,7 +194,10 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri
174194
Version: pluginapi.Version,
175195
Endpoint: path.Base(m.socket),
176196
ResourceName: resourceName,
177-
Options: &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag},
197+
Options: &pluginapi.DevicePluginOptions{
198+
PreStartRequired: m.preStartContainerFlag,
199+
GetPreferredAllocationAvailable: m.getPreferredAllocationFlag,
200+
},
178201
}
179202

180203
_, err = client.Register(context.Background(), reqt)
@@ -186,7 +209,11 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri
186209

187210
// GetDevicePluginOptions returns DevicePluginOptions settings for the device plugin.
188211
func (m *Stub) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
189-
return &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag}, nil
212+
options := &pluginapi.DevicePluginOptions{
213+
PreStartRequired: m.preStartContainerFlag,
214+
GetPreferredAllocationAvailable: m.getPreferredAllocationFlag,
215+
}
216+
return options, nil
190217
}
191218

192219
// PreStartContainer resets the devices received
@@ -216,6 +243,19 @@ func (m *Stub) Update(devs []*pluginapi.Device) {
216243
m.update <- devs
217244
}
218245

246+
// GetPreferredAllocation gets the preferred allocation from a set of available devices
247+
func (m *Stub) GetPreferredAllocation(ctx context.Context, r *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
248+
klog.Infof("GetPreferredAllocation, %+v", r)
249+
250+
devs := make(map[string]pluginapi.Device)
251+
252+
for _, dev := range m.devs {
253+
devs[dev.ID] = *dev
254+
}
255+
256+
return m.getPreferredAllocFunc(r, devs)
257+
}
258+
219259
// Allocate does a mock allocation
220260
func (m *Stub) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
221261
klog.Infof("Allocate, %+v", r)

pkg/kubelet/cm/devicemanager/endpoint.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
type endpoint interface {
3636
run()
3737
stop()
38+
getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error)
3839
allocate(devs []string) (*pluginapi.AllocateResponse, error)
3940
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
4041
callback(resourceName string, devices []pluginapi.Device)
@@ -138,6 +139,22 @@ func (e *endpointImpl) setStopTime(t time.Time) {
138139
e.stopTime = t
139140
}
140141

142+
// getPreferredAllocation issues GetPreferredAllocation gRPC call to the device plugin.
143+
func (e *endpointImpl) getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
144+
if e.isStopped() {
145+
return nil, fmt.Errorf(errEndpointStopped, e)
146+
}
147+
return e.client.GetPreferredAllocation(context.Background(), &pluginapi.PreferredAllocationRequest{
148+
ContainerRequests: []*pluginapi.ContainerPreferredAllocationRequest{
149+
{
150+
AvailableDeviceIDs: available,
151+
MustIncludeDeviceIDs: mustInclude,
152+
AllocationSize: int32(size),
153+
},
154+
},
155+
})
156+
}
157+
141158
// allocate issues Allocate gRPC call to the device plugin.
142159
func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
143160
if e.isStopped() {

pkg/kubelet/cm/devicemanager/endpoint_test.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,42 @@ func TestAllocate(t *testing.T) {
159159
require.Equal(t, resp, respOut)
160160
}
161161

162+
func TestGetPreferredAllocation(t *testing.T) {
163+
socket := path.Join("/tmp", esocketName)
164+
callbackCount := 0
165+
callbackChan := make(chan int)
166+
p, e := esetup(t, []*pluginapi.Device{}, socket, "mock", func(n string, d []pluginapi.Device) {
167+
callbackCount++
168+
callbackChan <- callbackCount
169+
})
170+
defer ecleanup(t, p, e)
171+
172+
resp := &pluginapi.PreferredAllocationResponse{
173+
ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
174+
{DeviceIDs: []string{"device0", "device1", "device2"}},
175+
},
176+
}
177+
178+
p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
179+
return resp, nil
180+
})
181+
182+
go e.run()
183+
// Wait for the callback to be issued.
184+
select {
185+
case <-callbackChan:
186+
break
187+
case <-time.After(time.Second):
188+
t.FailNow()
189+
}
190+
191+
respOut, err := e.getPreferredAllocation([]string{}, []string{}, -1)
192+
require.NoError(t, err)
193+
require.Equal(t, resp, respOut)
194+
}
195+
162196
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
163-
p := NewDevicePluginStub(devs, socket, resourceName, false)
197+
p := NewDevicePluginStub(devs, socket, resourceName, false, false)
164198

165199
err := p.Start()
166200
require.NoError(t, err)

pkg/kubelet/cm/devicemanager/manager.go

Lines changed: 109 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import (
4343
cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
4444
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
4545
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
46-
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
4746
"k8s.io/kubernetes/pkg/kubelet/config"
4847
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
4948
"k8s.io/kubernetes/pkg/kubelet/metrics"
@@ -658,49 +657,107 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
658657
return nil, nil
659658
}
660659
klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
661-
// Needs to allocate additional devices.
660+
// Check if resource registered with devicemanager
662661
if _, ok := m.healthyDevices[resource]; !ok {
663662
return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
664663
}
665-
devices = sets.NewString()
666-
// Allocates from reusableDevices list first.
667-
for device := range reusableDevices {
668-
devices.Insert(device)
669-
needed--
670-
if needed == 0 {
671-
return devices, nil
664+
665+
// Declare the list of allocated devices.
666+
// This will be populated and returned below.
667+
allocated := sets.NewString()
668+
669+
// Create a closure to help with device allocation
670+
// Returns 'true' once no more devices need to be allocated.
671+
allocateRemainingFrom := func(devices sets.String) bool {
672+
for device := range devices.Difference(allocated) {
673+
m.allocatedDevices[resource].Insert(device)
674+
allocated.Insert(device)
675+
needed--
676+
if needed == 0 {
677+
return true
678+
}
672679
}
680+
return false
673681
}
682+
683+
// Allocates from reusableDevices list first.
684+
if allocateRemainingFrom(reusableDevices) {
685+
return allocated, nil
686+
}
687+
674688
// Needs to allocate additional devices.
675689
if m.allocatedDevices[resource] == nil {
676690
m.allocatedDevices[resource] = sets.NewString()
677691
}
692+
678693
// Gets Devices in use.
679694
devicesInUse := m.allocatedDevices[resource]
680-
// Gets a list of available devices.
695+
// Gets Available devices.
681696
available := m.healthyDevices[resource].Difference(devicesInUse)
682697
if available.Len() < needed {
683698
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
684699
}
685-
// By default, pull devices from the unsorted list of available devices.
686-
allocated := available.UnsortedList()[:needed]
687-
// If topology alignment is desired, update allocated to the set of devices
688-
// with the best alignment.
689-
hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
690-
if m.deviceHasTopologyAlignment(resource) && hint.NUMANodeAffinity != nil {
691-
allocated = m.takeByTopology(resource, available, hint.NUMANodeAffinity, needed)
700+
701+
// Filters available Devices based on NUMA affinity.
702+
aligned, unaligned, noAffinity := m.filterByAffinity(podUID, contName, resource, available)
703+
704+
// If we can allocate all remaining devices from the set of aligned ones, then
705+
// give the plugin the chance to influence which ones to allocate from that set.
706+
if needed < aligned.Len() {
707+
// First allocate from the preferred devices list (if available).
708+
preferred, err := m.callGetPreferredAllocationIfAvailable(podUID, contName, resource, aligned.Union(allocated), allocated, required)
709+
if err != nil {
710+
return nil, err
711+
}
712+
if allocateRemainingFrom(preferred.Intersection(aligned.Union(allocated))) {
713+
return allocated, nil
714+
}
715+
// Then fallback to allocate from the aligned set if no preferred list
716+
// is returned (or not enough devices are returned in that list).
717+
if allocateRemainingFrom(aligned) {
718+
return allocated, nil
719+
}
720+
721+
return nil, fmt.Errorf("unexpectedly allocated less resources than required. Requested: %d, Got: %d", required, required-needed)
722+
}
723+
724+
// If we can't allocate all remaining devices from the set of aligned ones,
725+
// then start by first allocating all of the aligned devices (to ensure
726+
// that the alignment guaranteed by the TopologyManager is honored).
727+
if allocateRemainingFrom(aligned) {
728+
return allocated, nil
729+
}
730+
731+
// Then give the plugin the chance to influence the decision on any
732+
// remaining devices to allocate.
733+
preferred, err := m.callGetPreferredAllocationIfAvailable(podUID, contName, resource, available.Union(devices), devices, required)
734+
if err != nil {
735+
return nil, err
736+
}
737+
if allocateRemainingFrom(preferred.Intersection(available.Union(allocated))) {
738+
return allocated, nil
692739
}
693-
// Updates m.allocatedDevices with allocated devices to prevent them
694-
// from being allocated to other pods/containers, given that we are
695-
// not holding lock during the rpc call.
696-
for _, device := range allocated {
697-
m.allocatedDevices[resource].Insert(device)
698-
devices.Insert(device)
740+
741+
// Finally, if the plugin did not return a preferred allocation (or didn't
742+
// return a large enough one), then fall back to allocating the remaining
743+
// devices from the 'unaligned' and 'noAffinity' sets.
744+
if allocateRemainingFrom(unaligned) {
745+
return allocated, nil
746+
}
747+
if allocateRemainingFrom(noAffinity) {
748+
return allocated, nil
699749
}
700-
return devices, nil
750+
751+
return nil, fmt.Errorf("unexpectedly allocated less resources than required. Requested: %d, Got: %d", required, required-needed)
701752
}
702753

703-
func (m *ManagerImpl) takeByTopology(resource string, available sets.String, affinity bitmask.BitMask, request int) []string {
754+
func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.String) (sets.String, sets.String, sets.String) {
755+
// If alignment information is not available, just pass the available list back.
756+
hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
757+
if !m.deviceHasTopologyAlignment(resource) || hint.NUMANodeAffinity == nil {
758+
return sets.NewString(), sets.NewString(), available
759+
}
760+
704761
// Build a map of NUMA Nodes to the devices associated with them. A
705762
// device may be associated to multiple NUMA nodes at the same time. If an
706763
// available device does not have any NUMA Nodes associated with it, add it
@@ -754,7 +811,7 @@ func (m *ManagerImpl) takeByTopology(resource string, available sets.String, aff
754811
if perNodeDevices[n].Has(d) {
755812
if n == nodeWithoutTopology {
756813
withoutTopology = append(withoutTopology, d)
757-
} else if affinity.IsSet(n) {
814+
} else if hint.NUMANodeAffinity.IsSet(n) {
758815
fromAffinity = append(fromAffinity, d)
759816
} else {
760817
notFromAffinity = append(notFromAffinity, d)
@@ -764,8 +821,8 @@ func (m *ManagerImpl) takeByTopology(resource string, available sets.String, aff
764821
}
765822
}
766823

767-
// Concatenate the lists above return the first 'request' devices from it..
768-
return append(append(fromAffinity, notFromAffinity...), withoutTopology...)[:request]
824+
// Return all three lists containing the full set of devices across them.
825+
return sets.NewString(fromAffinity...), sets.NewString(notFromAffinity...), sets.NewString(withoutTopology...)
769826
}
770827

771828
// allocateContainerResources attempts to allocate all of required device
@@ -920,6 +977,30 @@ func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource s
920977
return nil
921978
}
922979

980+
// callGetPreferredAllocationIfAvailable issues GetPreferredAllocation grpc
981+
// call for device plugin resource with GetPreferredAllocationAvailable option set.
982+
func (m *ManagerImpl) callGetPreferredAllocationIfAvailable(podUID, contName, resource string, available, mustInclude sets.String, size int) (sets.String, error) {
983+
eI, ok := m.endpoints[resource]
984+
if !ok {
985+
return nil, fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
986+
}
987+
988+
if eI.opts == nil || !eI.opts.GetPreferredAllocationAvailable {
989+
klog.V(4).Infof("Plugin options indicate to skip GetPreferredAllocation for resource: %s", resource)
990+
return nil, nil
991+
}
992+
993+
m.mutex.Unlock()
994+
klog.V(4).Infof("Issuing a GetPreferredAllocation call for container, %s, of pod %s", contName, podUID)
995+
resp, err := eI.e.getPreferredAllocation(available.UnsortedList(), mustInclude.UnsortedList(), size)
996+
m.mutex.Lock()
997+
if err != nil {
998+
return nil, fmt.Errorf("device plugin GetPreferredAllocation rpc failed with err: %v", err)
999+
}
1000+
// TODO: Add metrics support for init RPC
1001+
return sets.NewString(resp.ContainerResponses[0].DeviceIDs...), nil
1002+
}
1003+
9231004
// sanitizeNodeAllocatable scans through allocatedDevices in the device manager
9241005
// and if necessary, updates allocatableResource in nodeInfo to at least equal to
9251006
// the allocated capacity. This allows pods that have already been scheduled on

0 commit comments

Comments
 (0)