Skip to content

Commit 5b68fe2

Browse files
qmloongmenglong.qi
andauthored
fix #39: get pods from kubelet client rather than list cluster-scope pods from apiserver (#41)
* fix: add .gitignore * fix: get pods from kubelet client rather than list cluster-scope pods from apiserver * feat: add query-kubelet flag for control the mothod of query pending podLists Co-authored-by: menglong.qi <[email protected]>
1 parent dcd90b4 commit 5b68fe2

File tree

10 files changed

+408
-46
lines changed

10 files changed

+408
-46
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea

cmd/nvidia/main.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,61 @@ package main
22

33
import (
44
"flag"
5-
5+
"fmt"
66
"github.com/AliyunContainerService/gpushare-device-plugin/pkg/gpu/nvidia"
7+
"github.com/AliyunContainerService/gpushare-device-plugin/pkg/kubelet/client"
78
log "github.com/golang/glog"
9+
"io/ioutil"
10+
"k8s.io/client-go/rest"
11+
"time"
812
)
913

1014
var (
11-
mps = flag.Bool("mps", false, "Enable or Disable MPS")
12-
healthCheck = flag.Bool("health-check", false, "Enable or disable Health check")
13-
memoryUnit = flag.String("memory-unit", "GiB", "Set memoryUnit of the GPU Memroy, support 'GiB' and 'MiB'")
15+
mps = flag.Bool("mps", false, "Enable or Disable MPS")
16+
healthCheck = flag.Bool("health-check", false, "Enable or disable Health check")
17+
memoryUnit = flag.String("memory-unit", "GiB", "Set memoryUnit of the GPU Memroy, support 'GiB' and 'MiB'")
18+
queryFromKubelet = flag.Bool("query-kubelet", true, "Query pending pods from kubelet instead of kube-apiserver")
19+
kubeletAddress = flag.String("kubelet-address", "0.0.0.0", "Kubelet IP Address")
20+
kubeletPort = flag.Uint("kubelet-port", 10250, "Kubelet listened Port")
21+
clientCert = flag.String("client-cert", "", "Kubelet TLS client certificate")
22+
clientKey = flag.String("client-key", "", "Kubelet TLS client key")
23+
token = flag.String("token", "", "Kubelet client bearer token")
24+
timeout = flag.Int("timeout", 10, "Kubelet client http timeout duration")
1425
)
1526

27+
func buildKubeletClient() *client.KubeletClient {
28+
if *clientCert == "" && *clientKey == "" && *token == "" {
29+
tokenByte, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
30+
if err != nil {
31+
panic(fmt.Errorf("in cluster mode, find token failed, error: %v", err))
32+
}
33+
tokenStr := string(tokenByte)
34+
token = &tokenStr
35+
}
36+
kubeletClient, err := client.NewKubeletClient(&client.KubeletClientConfig{
37+
Address: *kubeletAddress,
38+
Port: *kubeletPort,
39+
TLSClientConfig: rest.TLSClientConfig{
40+
Insecure: true,
41+
ServerName: "gpushare-device-plugin",
42+
CertFile: *clientCert,
43+
KeyFile: *clientKey,
44+
},
45+
BearerToken: *token,
46+
HTTPTimeout: time.Duration(*timeout) * time.Second,
47+
})
48+
if err != nil {
49+
panic(err)
50+
}
51+
return kubeletClient
52+
}
53+
1654
func main() {
1755
flag.Parse()
1856
log.V(1).Infoln("Start gpushare device plugin")
19-
ngm := nvidia.NewSharedGPUManager(*mps, *healthCheck, translatememoryUnits(*memoryUnit))
57+
58+
kubeletClient := buildKubeletClient()
59+
ngm := nvidia.NewSharedGPUManager(*mps, *healthCheck, *queryFromKubelet, translatememoryUnits(*memoryUnit), kubeletClient)
2060
err := ngm.Run()
2161
if err != nil {
2262
log.Fatalf("Failed due to %v", err)

cmd/podgetter/main.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"github.com/AliyunContainerService/gpushare-device-plugin/pkg/kubelet/client"
7+
"io/ioutil"
8+
"k8s.io/client-go/rest"
9+
"time"
10+
)
11+
12+
var (
13+
clientCert string
14+
clientKey string
15+
token string
16+
timeout int
17+
)
18+
19+
func main() {
20+
flag.StringVar(&clientCert, "client-cert", "", "")
21+
flag.StringVar(&clientKey, "client-key", "", "")
22+
flag.StringVar(&token, "token", "", "")
23+
flag.IntVar(&timeout, "timeout", 10, "")
24+
25+
flag.Parse()
26+
27+
if clientCert == "" && clientKey == "" && token == "" {
28+
tokenByte, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
29+
if err != nil {
30+
panic(fmt.Errorf("in cluster mode, find token failed, error: %v", err))
31+
}
32+
token = string(tokenByte)
33+
}
34+
35+
c, err := client.NewKubeletClient(&client.KubeletClientConfig{
36+
Address: "127.0.0.1",
37+
Port: 10250,
38+
TLSClientConfig: rest.TLSClientConfig{
39+
Insecure: true,
40+
ServerName: "kubelet",
41+
CertFile: clientCert,
42+
KeyFile: clientKey,
43+
},
44+
BearerToken: token,
45+
HTTPTimeout: time.Duration(timeout) * time.Second,
46+
})
47+
if err != nil {
48+
fmt.Println(err)
49+
return
50+
}
51+
podsList, err := c.GetNodeRunningPods()
52+
if err != nil {
53+
fmt.Println(err)
54+
return
55+
}
56+
fmt.Println(podsList)
57+
}

device-plugin-rbac.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ rules:
99
- ""
1010
resources:
1111
- nodes
12+
- nodes/proxy
1213
verbs:
1314
- get
1415
- list

pkg/gpu/nvidia/allocate.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (m *NvidiaDevicePlugin) Allocate(ctx context.Context,
5959
m.Lock()
6060
defer m.Unlock()
6161
log.Infoln("checking...")
62-
pods, err := getCandidatePods()
62+
pods, err := getCandidatePods(m.queryKubelet, m.kubeletClient)
6363
if err != nil {
6464
log.Infof("invalid allocation requst: Failed to find candidate pods due to %v", err)
6565
return buildErrResponse(reqs, podReqGPU), nil
@@ -155,8 +155,12 @@ func (m *NvidiaDevicePlugin) Allocate(ctx context.Context,
155155
return buildErrResponse(reqs, podReqGPU), nil
156156
}
157157

158-
log.Infof("new allocated GPUs info %v", &responses)
159-
log.Infoln("----Allocating GPU for gpu mem is ended----")
158+
podName := ""
159+
if assumePod != nil {
160+
podName = assumePod.Name
161+
}
162+
log.Infof("pod %v, new allocated GPUs info %v", podName, &responses)
163+
log.Infof("----Allocating GPU for gpu mem for %v is ended----", podName)
160164
// // Add this to make sure the container is created at least
161165
// currentTime := time.Now()
162166

pkg/gpu/nvidia/gpumanager.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package nvidia
22

33
import (
44
"fmt"
5+
"github.com/AliyunContainerService/gpushare-device-plugin/pkg/kubelet/client"
56
"syscall"
67
"time"
78

@@ -12,15 +13,19 @@ import (
1213
)
1314

1415
type sharedGPUManager struct {
15-
enableMPS bool
16-
healthCheck bool
16+
enableMPS bool
17+
healthCheck bool
18+
queryKubelet bool
19+
kubeletClient *client.KubeletClient
1720
}
1821

19-
func NewSharedGPUManager(enableMPS, healthCheck bool, bp MemoryUnit) *sharedGPUManager {
22+
func NewSharedGPUManager(enableMPS, healthCheck, queryKubelet bool, bp MemoryUnit, client *client.KubeletClient) *sharedGPUManager {
2023
metric = bp
2124
return &sharedGPUManager{
22-
enableMPS: enableMPS,
23-
healthCheck: healthCheck,
25+
enableMPS: enableMPS,
26+
healthCheck: healthCheck,
27+
queryKubelet: queryKubelet,
28+
kubeletClient: client,
2429
}
2530
}
2631

@@ -61,7 +66,7 @@ L:
6166
devicePlugin.Stop()
6267
}
6368

64-
devicePlugin, err = NewNvidiaDevicePlugin(ngm.enableMPS, ngm.healthCheck)
69+
devicePlugin, err = NewNvidiaDevicePlugin(ngm.enableMPS, ngm.healthCheck, ngm.queryKubelet, ngm.kubeletClient)
6570
if err != nil {
6671
log.Warningf("Failed to get device plugin due to %v", err)
6772
} else if err = devicePlugin.Serve(); err != nil {

pkg/gpu/nvidia/podmanager.go

Lines changed: 89 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,29 @@
11
package nvidia
22

33
import (
4+
"encoding/json"
45
"fmt"
5-
"os"
6-
"sort"
7-
"time"
8-
6+
"github.com/AliyunContainerService/gpushare-device-plugin/pkg/kubelet/client"
97
log "github.com/golang/glog"
10-
"k8s.io/apimachinery/pkg/labels"
11-
128
"k8s.io/api/core/v1"
139
"k8s.io/apimachinery/pkg/api/resource"
1410
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1511
"k8s.io/apimachinery/pkg/fields"
16-
12+
"k8s.io/apimachinery/pkg/labels"
1713
"k8s.io/apimachinery/pkg/types"
1814
"k8s.io/client-go/kubernetes"
1915
"k8s.io/client-go/rest"
2016
"k8s.io/client-go/tools/clientcmd"
2117
nodeutil "k8s.io/kubernetes/pkg/util/node"
18+
"os"
19+
"sort"
20+
"time"
2221
)
2322

2423
var (
2524
clientset *kubernetes.Clientset
2625
nodeName string
27-
retries = 5
26+
retries = 8
2827
)
2928

3029
func kubeInit() {
@@ -58,18 +57,18 @@ func kubeInit() {
5857
}
5958

6059
func disableCGPUIsolationOrNot() (bool, error) {
61-
disable := false
62-
node, err := clientset.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
63-
if err != nil {
64-
return disable, err
65-
}
66-
labels := node.ObjectMeta.Labels
67-
value, ok := labels[EnvNodeLabelForDisableCGPU]
68-
if ok && value == "true" {
69-
log.Infof("enable gpusharing mode and disable cgpu mode")
70-
disable = true
71-
}
72-
return disable, nil
60+
disable := false
61+
node, err := clientset.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
62+
if err != nil {
63+
return disable, err
64+
}
65+
labels := node.ObjectMeta.Labels
66+
value, ok := labels[EnvNodeLabelForDisableCGPU]
67+
if ok && value == "true" {
68+
log.Infof("enable gpusharing mode and disable cgpu mode")
69+
disable = true
70+
}
71+
return disable, nil
7372
}
7473

7574
func patchGPUCount(gpuCount int) error {
@@ -99,31 +98,90 @@ func patchGPUCount(gpuCount int) error {
9998
return err
10099
}
101100

102-
func getPendingPodsInNode() ([]v1.Pod, error) {
103-
// pods, err := m.lister.List(labels.Everything())
104-
// if err != nil {
105-
// return nil, err
106-
// }
107-
pods := []v1.Pod{}
101+
func getPodList(kubeletClient *client.KubeletClient) (*v1.PodList, error) {
102+
podList, err := kubeletClient.GetNodeRunningPods()
103+
if err != nil {
104+
return nil, err
105+
}
108106

109-
podIDMap := map[types.UID]bool{}
107+
list, _ := json.Marshal(podList)
108+
log.V(8).Infof("get pods list %v", string(list))
109+
110+
resultPodList := &v1.PodList{}
111+
for _, metaPod := range podList.Items {
112+
if metaPod.Status.Phase != v1.PodPending {
113+
continue
114+
}
115+
resultPodList.Items = append(resultPodList.Items, metaPod)
116+
}
117+
118+
if len(resultPodList.Items) == 0 {
119+
return nil, fmt.Errorf("not found pending pod")
120+
}
121+
122+
return resultPodList, nil
123+
}
110124

125+
func getPodListsByQueryKubelet(kubeletClient *client.KubeletClient) (*v1.PodList, error) {
126+
podList, err := getPodList(kubeletClient)
127+
for i := 0; i < retries && err != nil; i++ {
128+
podList, err = getPodList(kubeletClient)
129+
log.Warningf("failed to get pending pod list, retry")
130+
time.Sleep(100 * time.Millisecond)
131+
}
132+
if err != nil {
133+
log.Warningf("not found from kubelet /pods api, start to list apiserver")
134+
podList, err = getPodListsByListAPIServer()
135+
if err != nil {
136+
return nil, err
137+
}
138+
}
139+
return podList, nil
140+
}
141+
142+
func getPodListsByListAPIServer() (*v1.PodList, error) {
111143
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName, "status.phase": "Pending"})
112144
podList, err := clientset.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{
113145
FieldSelector: selector.String(),
114146
LabelSelector: labels.Everything().String(),
115147
})
116-
for i := 0; i < retries && err != nil; i++ {
148+
for i := 0; i < 3 && err != nil; i++ {
117149
podList, err = clientset.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{
118150
FieldSelector: selector.String(),
119151
LabelSelector: labels.Everything().String(),
120152
})
121-
time.Sleep(100 * time.Millisecond)
153+
time.Sleep(1 * time.Second)
122154
}
123155
if err != nil {
124156
return nil, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
125157
}
126158

159+
return podList, nil
160+
}
161+
162+
func getPendingPodsInNode(queryKubelet bool, kubeletClient *client.KubeletClient) ([]v1.Pod, error) {
163+
// pods, err := m.lister.List(labels.Everything())
164+
// if err != nil {
165+
// return nil, err
166+
// }
167+
pods := []v1.Pod{}
168+
169+
podIDMap := map[types.UID]bool{}
170+
171+
var podList *v1.PodList
172+
var err error
173+
if queryKubelet {
174+
podList, err = getPodListsByQueryKubelet(kubeletClient)
175+
if err != nil {
176+
return nil, err
177+
}
178+
} else {
179+
podList, err = getPodListsByListAPIServer()
180+
if err != nil {
181+
return nil, err
182+
}
183+
}
184+
127185
log.V(5).Infof("all pod list %v", podList.Items)
128186

129187
// if log.V(5) {
@@ -154,9 +212,9 @@ func getPendingPodsInNode() ([]v1.Pod, error) {
154212
}
155213

156214
// pick up the gpushare pod with assigned status is false, and
157-
func getCandidatePods() ([]*v1.Pod, error) {
215+
func getCandidatePods(queryKubelet bool, client *client.KubeletClient) ([]*v1.Pod, error) {
158216
candidatePods := []*v1.Pod{}
159-
allPods, err := getPendingPodsInNode()
217+
allPods, err := getPendingPodsInNode(queryKubelet, client)
160218
if err != nil {
161219
return candidatePods, err
162220
}

0 commit comments

Comments
 (0)