Skip to content

Commit de20781

Browse files
yangshiqiyangshiqi
andauthored
Separate options from client to make the responsibility more clear. (#938)
* Separate options from client to make the responsibility more clear. Remove the magic number in the main function and define it as a constant. Signed-off-by: yangshiqi <[email protected]> * fix merge bugs and add testcase. remove some comments to try e2e Signed-off-by: yangshiqi <[email protected]> * debug for e2e Signed-off-by: yangshiqi <[email protected]> * fix e2e error Signed-off-by: yangshiqi <[email protected]> --------- Signed-off-by: yangshiqi <[email protected]> Co-authored-by: yangshiqi <[email protected]>
1 parent 9052c08 commit de20781

File tree

8 files changed

+289
-82
lines changed

8 files changed

+289
-82
lines changed

cmd/scheduler/main.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@ func init() {
6767
rootCmd.Flags().StringVar(&config.GPUSchedulerPolicy, "gpu-scheduler-policy", util.GPUSchedulerPolicySpread.String(), "GPU scheduler policy")
6868
rootCmd.Flags().StringVar(&config.MetricsBindAddress, "metrics-bind-address", ":9395", "The TCP address that the scheduler should bind to for serving prometheus metrics(e.g. 127.0.0.1:9395, :9395)")
6969
rootCmd.Flags().StringToStringVar(&config.NodeLabelSelector, "node-label-selector", nil, "key=value pairs separated by commas")
70-
// add QPS and Burst to the global flagset
71-
// qps and burst settings for the client-go client
72-
rootCmd.Flags().Float32Var(&config.QPS, "kube-qps", 5.0, "QPS to use while talking with kube-apiserver.")
73-
rootCmd.Flags().IntVar(&config.Burst, "kube-burst", 10, "Burst to use while talking with kube-apiserver.")
74-
// Add profiling related flags
70+
71+
rootCmd.Flags().Float32Var(&config.QPS, "kube-qps", client.DefaultQPS, "QPS to use while talking with kube-apiserver.")
72+
rootCmd.Flags().IntVar(&config.Burst, "kube-burst", client.DefaultBurst, "Burst to use while talking with kube-apiserver.")
73+
rootCmd.Flags().IntVar(&config.Timeout, "kube-timeout", client.DefaultTimeout, "Timeout to use while talking with kube-apiserver.")
7574
rootCmd.Flags().BoolVar(&enableProfiling, "profiling", false, "Enable pprof profiling via HTTP server")
75+
7676
rootCmd.PersistentFlags().AddGoFlagSet(device.GlobalFlagSet())
7777
rootCmd.AddCommand(version.VersionCmd)
7878
rootCmd.Flags().AddGoFlagSet(util.InitKlogFlags())
@@ -99,7 +99,12 @@ func injectProfilingRoute(router *httprouter.Router) {
9999
}
100100

101101
func start() error {
102-
client.InitGlobalClient(client.WithBurst(config.Burst), client.WithQPS(config.QPS))
102+
client.InitGlobalClient(
103+
client.WithBurst(config.Burst),
104+
client.WithQPS(config.QPS),
105+
client.WithTimeout(config.Timeout),
106+
)
107+
103108
device.InitDevices()
104109
sher = scheduler.NewScheduler()
105110
sher.Start()

pkg/device/cambricon/device_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,11 @@ func Test_PatchAnnotations(t *testing.T) {
453453
}
454454

455455
func Test_setNodeLock(t *testing.T) {
456-
client.InitGlobalClient(client.WithBurst(10), client.WithQPS(5.0))
456+
client.InitGlobalClient(
457+
client.WithBurst(10),
458+
client.WithQPS(5.0),
459+
client.WithTimeout(60),
460+
)
457461
tests := []struct {
458462
name string
459463
node corev1.Node

pkg/scheduler/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import "github.com/Project-HAMi/HAMi/pkg/util"
2121
var (
2222
QPS float32
2323
Burst int
24+
Timeout int
2425
HTTPBind string
2526
SchedulerName string
2627
MetricsBindAddress string

pkg/util/client/client.go

Lines changed: 41 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ import (
2828
"k8s.io/klog/v2"
2929
)
3030

31+
type Client struct {
32+
// Embedded kubernetes.Interface to avoid name conflicts.
33+
kubernetes.Interface
34+
config *rest.Config
35+
}
36+
3137
var (
3238
KubeClient kubernetes.Interface
3339
once sync.Once
@@ -37,71 +43,59 @@ func init() {
3743
KubeClient = nil
3844
}
3945

46+
// GetClient returns the global Kubernetes client.
4047
func GetClient() kubernetes.Interface {
4148
return KubeClient
4249
}
4350

44-
// Client is a kubernetes client.
45-
type Client struct {
46-
Client kubernetes.Interface
47-
QPS float32
48-
Burst int
49-
}
50-
51-
// WithQPS sets the QPS of the client.
52-
func WithQPS(qps float32) func(*Client) {
53-
return func(c *Client) {
54-
c.QPS = qps
51+
// NewClient creates a new Kubernetes client with the given options.
52+
func NewClient(opts ...Option) (*Client, error) {
53+
restConfig, err := loadKubeConfig()
54+
if err != nil {
55+
return nil, fmt.Errorf("failed to load kubeconfig: %w", err)
5556
}
56-
}
5757

58-
func WithBurst(burst int) func(*Client) {
59-
return func(c *Client) {
60-
c.Burst = burst
58+
// Apply WithDefaults option first to set default values.
59+
WithDefaults()(restConfig)
60+
61+
// Then apply user-provided options that will override defaults if specified.
62+
for _, opt := range opts {
63+
opt(restConfig)
6164
}
62-
}
6365

64-
// NewClientWithConfig creates a new client with a given config.
65-
func NewClientWithConfig(config *rest.Config, opts ...func(*Client)) (*Client, error) {
66-
client, err := kubernetes.NewForConfig(config)
66+
clientset, err := kubernetes.NewForConfig(restConfig)
6767
if err != nil {
68-
return nil, err
69-
}
70-
c := &Client{
71-
Client: client,
72-
}
73-
for _, opt := range opts {
74-
opt(c)
68+
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
7569
}
76-
return c, nil
70+
71+
return &Client{
72+
Interface: clientset,
73+
config: restConfig,
74+
}, nil
75+
}
76+
77+
// InitGlobalClient initializes the global Kubernetes client with the given options.
78+
func InitGlobalClient(opts ...Option) {
79+
once.Do(func() {
80+
client, err := NewClient(opts...)
81+
if err != nil {
82+
klog.Fatalf("Failed to initialize global client: %v", err)
83+
}
84+
KubeClient = client.Interface
85+
})
7786
}
7887

79-
// NewClient creates a new client.
80-
func NewClient(ops ...func(*Client)) (*Client, error) {
88+
// loadKubeConfig loads Kubernetes configuration from the environment or in-cluster.
89+
func loadKubeConfig() (*rest.Config, error) {
8190
kubeConfigPath := os.Getenv("KUBECONFIG")
8291
if kubeConfigPath == "" {
8392
kubeConfigPath = filepath.Join(os.Getenv("HOME"), ".kube", "config")
8493
}
94+
8595
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
8696
if err != nil {
8797
klog.Infof("BuildConfigFromFlags failed for file %s: %v. Using in-cluster config.", kubeConfigPath, err)
88-
config, err = rest.InClusterConfig()
89-
if err != nil {
90-
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
91-
}
92-
}
93-
c, err := NewClientWithConfig(config, ops...)
94-
if err != nil {
95-
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
96-
}
97-
return c, err
98-
}
99-
100-
// InitGlobalClient creates a new global client.
101-
func InitGlobalClient(ops ...func(*Client)) {
102-
c, err := NewClient(ops...)
103-
if err != nil {
104-
klog.Fatalf("new client error %s", err.Error())
98+
return rest.InClusterConfig()
10599
}
106-
KubeClient = c.Client
100+
return config, nil
107101
}

pkg/util/client/client_test.go

Lines changed: 160 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,26 @@ limitations under the License.
1717
package client
1818

1919
import (
20+
"context"
2021
"errors"
22+
"fmt"
2123
"os"
2224
"path/filepath"
25+
"sync"
2326
"testing"
27+
"time"
2428

29+
"gotest.tools/v3/assert"
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2531
"k8s.io/client-go/rest"
2632
"k8s.io/client-go/tools/clientcmd"
27-
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
2833
)
2934

30-
// MockClientConfig is a mock implementation of clientcmd.ClientConfig.
31-
type MockClientConfig struct {
32-
config *rest.Config
33-
err error
34-
}
35-
36-
func (m *MockClientConfig) RawConfig() (clientcmdapi.Config, error) {
37-
return clientcmdapi.Config{}, nil
38-
}
39-
40-
func (m *MockClientConfig) ClientConfig() (*rest.Config, error) {
41-
return m.config, m.err
42-
}
43-
44-
func (m *MockClientConfig) Namespace() (string, bool, error) {
45-
return "", false, nil
46-
}
47-
48-
func (m *MockClientConfig) ConfigAccess() clientcmd.ConfigAccess {
49-
return nil
50-
}
35+
// Mock functions for testing.
36+
var (
37+
buildConfigFromFlags = clientcmd.BuildConfigFromFlags
38+
inClusterConfig = rest.InClusterConfig
39+
)
5140

5241
// TestGetClient tests the GetClient function.
5342
func TestGetClient(t *testing.T) {
@@ -117,8 +106,152 @@ func TestGetClient(t *testing.T) {
117106
}
118107
}
119108

120-
// Mock functions for testing.
121-
var (
122-
buildConfigFromFlags = clientcmd.BuildConfigFromFlags
123-
inClusterConfig = rest.InClusterConfig
124-
)
109+
// TestClientWithOptions tests client initialization with options.
110+
func TestClientWithOptions(t *testing.T) {
111+
KubeClient = nil
112+
once = sync.Once{}
113+
114+
timeout := 1
115+
client, _ := NewClient(WithTimeout(timeout))
116+
117+
assert.Equal(t, client.config.Timeout, time.Duration(timeout)*time.Second)
118+
assert.Equal(t, client.config.QPS, DefaultQPS)
119+
assert.Equal(t, client.config.Burst, DefaultBurst)
120+
121+
KubeClient = nil
122+
once = sync.Once{}
123+
124+
qps := float32(50.0)
125+
client, _ = NewClient(WithQPS(qps))
126+
127+
assert.Equal(t, client.config.Timeout, time.Duration(DefaultTimeout)*time.Second)
128+
assert.Equal(t, client.config.QPS, qps)
129+
assert.Equal(t, client.config.Burst, DefaultBurst)
130+
131+
KubeClient = nil
132+
once = sync.Once{}
133+
burst := 100
134+
client, _ = NewClient(WithBurst(burst))
135+
136+
assert.Equal(t, client.config.Timeout, time.Duration(DefaultTimeout)*time.Second)
137+
assert.Equal(t, client.config.QPS, DefaultQPS)
138+
assert.Equal(t, client.config.Burst, burst)
139+
140+
KubeClient = nil
141+
once = sync.Once{}
142+
timeout = 2
143+
qps = 0.5
144+
burst = 100
145+
client, _ = NewClient(WithTimeout(timeout), WithQPS(qps), WithBurst(burst))
146+
147+
assert.Equal(t, client.config.Timeout, time.Duration(timeout)*time.Second)
148+
assert.Equal(t, client.config.QPS, qps)
149+
assert.Equal(t, client.config.Burst, burst)
150+
}
151+
152+
// TestClientRealNodePerformance tests the performance with a real Kubernetes cluster if available.
153+
func TestClientRealNodePerformance(t *testing.T) {
154+
155+
skipRealClusterTest := true
156+
// Skip this test by default as it requires a real Kubernetes cluster.
157+
if skipRealClusterTest == true {
158+
t.Skip("Skipping real cluster test. Set TEST_WITH_REAL_CLUSTER=true to run this test.")
159+
}
160+
161+
tests := []struct {
162+
name string
163+
qps float32
164+
burst int
165+
updates int
166+
timeout int
167+
}{
168+
{
169+
name: "Real Cluster - Low QPS and Burst",
170+
qps: 1,
171+
burst: 1,
172+
updates: 10,
173+
timeout: 1,
174+
},
175+
{
176+
name: "Real Cluster - Standard Timeout",
177+
qps: 5,
178+
burst: 10,
179+
updates: 10,
180+
timeout: 5,
181+
},
182+
{
183+
name: "Real Cluster - High Timeout",
184+
qps: 10,
185+
burst: 20,
186+
updates: 15,
187+
timeout: 10,
188+
},
189+
{
190+
name: "Real Cluster - Very Short Timeout",
191+
qps: 5,
192+
burst: 5,
193+
updates: 5,
194+
timeout: 1,
195+
},
196+
}
197+
198+
labelKey := "test-performance-label"
199+
var nodeName string
200+
201+
for _, tt := range tests {
202+
t.Run(tt.name, func(t *testing.T) {
203+
client, err := NewClient(WithQPS(tt.qps), WithBurst(tt.burst), WithTimeout(tt.timeout))
204+
if err != nil {
205+
t.Fatalf("Failed to create client: %v", err)
206+
}
207+
208+
if nodeName == "" {
209+
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
210+
if err != nil {
211+
t.Fatalf("Failed to list nodes: %v", err)
212+
}
213+
if len(nodes.Items) == 0 {
214+
t.Fatal("No nodes found in the cluster")
215+
}
216+
nodeName = nodes.Items[0].Name
217+
t.Logf("Using node %s for testing", nodeName)
218+
}
219+
start := time.Now()
220+
for i := 0; i < tt.updates; i++ {
221+
labelValue := fmt.Sprintf("perf-test-value-%d", i)
222+
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
223+
if err != nil {
224+
t.Fatalf("Failed to get node: %v", err)
225+
}
226+
if node.Labels == nil {
227+
node.Labels = make(map[string]string)
228+
}
229+
node.Labels[labelKey] = labelValue
230+
_, err = client.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
231+
if err != nil {
232+
t.Fatalf("Failed to update node: %v", err)
233+
}
234+
}
235+
236+
elapsed := time.Since(start)
237+
238+
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
239+
if err != nil {
240+
t.Fatalf("Failed to get node during cleanup: %v", err)
241+
}
242+
delete(node.Labels, labelKey)
243+
_, err = client.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
244+
if err != nil {
245+
t.Fatalf("Failed to cleanup test label: %v", err)
246+
}
247+
248+
opsPerSecond := float64(tt.updates) / elapsed.Seconds()
249+
250+
t.Logf("Real cluster performance test results for %s:", tt.name)
251+
t.Logf(" - QPS: %.1f, Burst: %d", tt.qps, tt.burst)
252+
t.Logf(" - Updates performed: %d", tt.updates)
253+
t.Logf(" - Total time: %v", elapsed)
254+
t.Logf(" - Operations per second: %.2f", opsPerSecond)
255+
})
256+
}
257+
}

0 commit comments

Comments
 (0)