Skip to content

Commit 2f2ade1

Browse files
authored
Merge pull request kubernetes-sigs#720 from cheftako/release-0.31
Release v0.31 Cherry picked back missing v0.31.1 PR - KNP agent should pick the larger of the two server count
2 parents 0032c48 + 5fd25f1 commit 2f2ade1

File tree

10 files changed

+300
-13
lines changed

10 files changed

+300
-13
lines changed

cmd/agent/app/options/options.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ type GrpcProxyAgentOptions struct {
8585
// Enables updating the server count by counting the number of valid leases
8686
// matching the selector.
8787
CountServerLeases bool
88+
// Namespace where lease objects are managed.
89+
LeaseNamespace string
90+
// Labels on which lease objects are managed.
91+
LeaseLabel string
92+
// ServerCountSource describes how server counts should be combined.
93+
ServerCountSource string
8894
// Path to kubeconfig (used by kubernetes client for lease listing)
8995
KubeconfigPath string
9096
// Content type of requests sent to apiserver.
@@ -104,6 +110,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
104110
WarnOnChannelLimit: o.WarnOnChannelLimit,
105111
SyncForever: o.SyncForever,
106112
XfrChannelSize: o.XfrChannelSize,
113+
ServerCountSource: o.ServerCountSource,
107114
}
108115
}
109116

@@ -132,6 +139,9 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
132139
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
133140
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
134141
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
142+
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.")
143+
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
144+
flags.StringVar(&o.ServerCountSource, "server-count-source", o.ServerCountSource, "Defines how the server counts from lease and from server responses are combined. Possible values: 'default' to use only one source (server or leases depending on other flags), 'max' to take the larger value.")
135145
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
136146
flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.")
137147
return flags
@@ -159,6 +169,10 @@ func (o *GrpcProxyAgentOptions) Print() {
159169
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers))
160170
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
161171
klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever)
172+
klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases)
173+
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
174+
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
175+
klog.V(1).Infof("ServerCountSource set to %s.\n", o.ServerCountSource)
162176
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
163177
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
164178
}
@@ -216,6 +230,18 @@ func (o *GrpcProxyAgentOptions) Validate() error {
216230
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
217231
}
218232
}
233+
// Validate labels provided.
234+
if o.CountServerLeases {
235+
_, err := util.ParseLabels(o.LeaseLabel)
236+
if err != nil {
237+
return err
238+
}
239+
}
240+
if o.ServerCountSource != "" {
241+
if o.ServerCountSource != "default" && o.ServerCountSource != "max" {
242+
return fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got %v", o.ServerCountSource)
243+
}
244+
}
219245

220246
return nil
221247
}
@@ -263,6 +289,9 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
263289
SyncForever: false,
264290
XfrChannelSize: 150,
265291
CountServerLeases: false,
292+
LeaseNamespace: "kube-system",
293+
LeaseLabel: "k8s-app=konnectivity-server",
294+
ServerCountSource: "default",
266295
KubeconfigPath: "",
267296
APIContentType: runtime.ContentTypeProtobuf,
268297
}

cmd/agent/app/options/options_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ func TestValidate(t *testing.T) {
156156
fieldMap: map[string]interface{}{"XfrChannelSize": -10},
157157
expected: fmt.Errorf("channel size -10 must be greater than 0"),
158158
},
159+
"ServerCountSource": {
160+
fieldMap: map[string]interface{}{"ServerCountSource": "foobar"},
161+
expected: fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got foobar"),
162+
},
159163
} {
160164
t.Run(desc, func(t *testing.T) {
161165
testAgentOptions := NewGrpcProxyAgentOptions()

cmd/agent/app/server.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ import (
5353

5454
const (
5555
ReadHeaderTimeout = 60 * time.Second
56-
LeaseNamespace = "kube-system"
5756
LeaseInformerResync = time.Second * 10
5857
)
5958

@@ -163,11 +162,11 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
163162
if err != nil {
164163
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
165164
}
166-
leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, LeaseNamespace, LeaseInformerResync)
165+
leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, o.LeaseNamespace, LeaseInformerResync)
167166
go leaseInformer.Run(stopCh)
168167
cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced)
169168
leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer())
170-
serverLeaseSelector, _ := labels.Parse("k8s-app=konnectivity-server")
169+
serverLeaseSelector, _ := labels.Parse(o.LeaseLabel)
171170
serverLeaseCounter := agent.NewServerLeaseCounter(
172171
clock.RealClock{},
173172
leaseLister,

cmd/server/app/options/options.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ type ProxyRunOptions struct {
108108

109109
// Lease controller configuration
110110
EnableLeaseController bool
111+
// Lease Namespace
112+
LeaseNamespace string
113+
// Lease Labels
114+
LeaseLabel string
111115
}
112116

113117
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -146,6 +150,8 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
146150
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
147151
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.")
148152
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
153+
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
154+
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
149155
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
150156
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
151157

@@ -184,6 +190,9 @@ func (o *ProxyRunOptions) Print() {
184190
klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst)
185191
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
186192
klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies)
193+
klog.V(1).Infof("EnableLeaseController set to %v.\n", o.EnableLeaseController)
194+
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
195+
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
187196
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
188197
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
189198
}
@@ -321,6 +330,13 @@ func (o *ProxyRunOptions) Validate() error {
321330
}
322331
}
323332
}
333+
// Validate labels provided.
334+
if o.EnableLeaseController {
335+
_, err := util.ParseLabels(o.LeaseLabel)
336+
if err != nil {
337+
return err
338+
}
339+
}
324340

325341
return nil
326342
}
@@ -361,6 +377,8 @@ func NewProxyRunOptions() *ProxyRunOptions {
361377
CipherSuites: make([]string, 0),
362378
XfrChannelSize: 10,
363379
EnableLeaseController: false,
380+
LeaseNamespace: "kube-system",
381+
LeaseLabel: "k8s-app=konnectivity-server",
364382
}
365383
return &o
366384
}

cmd/server/app/server.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ const (
5757
LeaseDuration = 30 * time.Second
5858
LeaseRenewalInterval = 15 * time.Second
5959
LeaseGCInterval = 15 * time.Second
60-
LeaseNamespace = "kube-system"
6160
)
6261

6362
func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command {
@@ -156,6 +155,11 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
156155
}
157156
defer p.agentServer.Stop()
158157

158+
labels, err := util.ParseLabels(o.LeaseLabel)
159+
if err != nil {
160+
return err
161+
}
162+
159163
if o.EnableLeaseController {
160164
leaseController := leases.NewController(
161165
k8sClient,
@@ -164,8 +168,8 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
164168
LeaseRenewalInterval,
165169
LeaseGCInterval,
166170
fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID),
167-
LeaseNamespace,
168-
map[string]string{"k8s-app": "konnectivity-server"},
171+
o.LeaseNamespace,
172+
labels,
169173
)
170174
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
171175
leaseController.Run(ctx)

pkg/agent/clientset.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ import (
3030
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics"
3131
)
3232

33+
const (
34+
fromResponses = "KNP server response headers"
35+
fromLeases = "KNP lease count"
36+
fromFallback = "fallback to 1"
37+
)
38+
3339
// ClientSet consists of clients connected to each instance of an HA proxy server.
3440
type ClientSet struct {
3541
mu sync.Mutex //protects the clients.
@@ -39,7 +45,7 @@ type ClientSet struct {
3945
agentID string // ID of this agent
4046
address string // proxy server address. Assuming HA proxy server
4147

42-
leaseCounter *ServerLeaseCounter // counts number of proxy server leases
48+
leaseCounter ServerCounter // counts number of proxy server leases
4349
lastReceivedServerCount int // last server count received from a proxy server
4450
lastServerCount int // last server count value from either lease system or proxy server, former takes priority
4551

@@ -68,6 +74,7 @@ type ClientSet struct {
6874
xfrChannelSize int
6975

7076
syncForever bool // Continue syncing (support dynamic server count).
77+
serverCountSource string
7178
}
7279

7380
func (cs *ClientSet) ClientsCount() int {
@@ -147,7 +154,8 @@ type ClientSetConfig struct {
147154
WarnOnChannelLimit bool
148155
SyncForever bool
149156
XfrChannelSize int
150-
ServerLeaseCounter *ServerLeaseCounter
157+
ServerLeaseCounter ServerCounter
158+
ServerCountSource string
151159
}
152160

153161
func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
@@ -167,6 +175,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
167175
xfrChannelSize: cc.XfrChannelSize,
168176
stopCh: stopCh,
169177
leaseCounter: cc.ServerLeaseCounter,
178+
serverCountSource: cc.ServerCountSource,
170179
}
171180
}
172181

@@ -218,15 +227,41 @@ func (cs *ClientSet) sync() {
218227
}
219228

220229
func (cs *ClientSet) ServerCount() int {
230+
221231
var serverCount int
222-
if cs.leaseCounter != nil {
223-
serverCount = cs.leaseCounter.Count()
224-
} else {
225-
serverCount = cs.lastReceivedServerCount
232+
var countSourceLabel string
233+
234+
switch cs.serverCountSource {
235+
case "", "default":
236+
if cs.leaseCounter != nil {
237+
serverCount = cs.leaseCounter.Count()
238+
countSourceLabel = fromLeases
239+
} else {
240+
serverCount = cs.lastReceivedServerCount
241+
countSourceLabel = fromResponses
242+
}
243+
case "max":
244+
countFromLeases := 0
245+
if cs.leaseCounter != nil {
246+
countFromLeases = cs.leaseCounter.Count()
247+
}
248+
countFromResponses := cs.lastReceivedServerCount
249+
250+
serverCount = countFromLeases
251+
countSourceLabel = fromLeases
252+
if countFromResponses > serverCount {
253+
serverCount = countFromResponses
254+
countSourceLabel = fromResponses
255+
}
256+
if serverCount == 0 {
257+
serverCount = 1
258+
countSourceLabel = fromFallback
259+
}
260+
226261
}
227262

228263
if serverCount != cs.lastServerCount {
229-
klog.Warningf("change detected in proxy server count (was: %d, now: %d)", cs.lastServerCount, serverCount)
264+
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel)
230265
cs.lastServerCount = serverCount
231266
}
232267

pkg/agent/clientset_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package agent
18+
19+
import (
20+
"testing"
21+
)
22+
23+
type FakeServerCounter struct {
24+
count int
25+
}
26+
27+
func (f *FakeServerCounter) Count() int {
28+
return f.count
29+
}
30+
31+
func TestServerCount(t *testing.T) {
32+
testCases := []struct{
33+
name string
34+
serverCountSource string
35+
leaseCounter ServerCounter
36+
responseCount int
37+
want int
38+
} {
39+
{
40+
name: "higher from response",
41+
serverCountSource: "max",
42+
responseCount: 42,
43+
leaseCounter: &FakeServerCounter{24},
44+
want: 42,
45+
},
46+
{
47+
name: "higher from leases",
48+
serverCountSource: "max",
49+
responseCount: 3,
50+
leaseCounter: &FakeServerCounter{6},
51+
want: 6,
52+
},
53+
{
54+
name: "both zero",
55+
serverCountSource: "max",
56+
responseCount: 0,
57+
leaseCounter: &FakeServerCounter{0},
58+
want: 1,
59+
},
60+
61+
{
62+
name: "response picked by default when no lease counter",
63+
serverCountSource: "default",
64+
responseCount: 3,
65+
leaseCounter: nil,
66+
want: 3,
67+
},
68+
{
69+
name: "lease counter always picked when present",
70+
serverCountSource: "default",
71+
responseCount: 6,
72+
leaseCounter: &FakeServerCounter{3},
73+
want: 3,
74+
},
75+
}
76+
77+
for _, tc := range testCases {
78+
t.Run(tc.name, func(t *testing.T) {
79+
80+
cs := &ClientSet{
81+
clients: make(map[string]*Client),
82+
leaseCounter: tc.leaseCounter,
83+
serverCountSource: tc.serverCountSource,
84+
85+
}
86+
cs.lastReceivedServerCount = tc.responseCount
87+
if got := cs.ServerCount(); got != tc.want {
88+
t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want)
89+
}
90+
})
91+
}
92+
93+
}

pkg/agent/lease_counter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ import (
3636
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
3737
)
3838

39+
type ServerCounter interface {
40+
Count() int
41+
}
42+
3943
// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
4044
// current proxy server count.
4145
type ServerLeaseCounter struct {

0 commit comments

Comments
 (0)