Skip to content

Commit de8defb

Browse files
committed
Review comments
1 parent 0345045 commit de8defb

File tree

14 files changed

+301
-188
lines changed

14 files changed

+301
-188
lines changed

cmd/agent/app/options/options.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/google/uuid"
2828
"github.com/spf13/pflag"
2929
"google.golang.org/grpc"
30-
"k8s.io/apimachinery/pkg/labels"
3130
"k8s.io/klog/v2"
3231

3332
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
@@ -82,9 +81,9 @@ type GrpcProxyAgentOptions struct {
8281
SyncForever bool
8382
XfrChannelSize int
8483

85-
// Providing a label selector enables updating the server count by counting the
86-
// number of valid leases matching the selector.
87-
ServerLeaseSelector string
84+
// Enables updating the server count by counting the number of valid leases
85+
// matching the selector.
86+
CountServerLeases bool
8887
// Path to kubeconfig (used by kubernetes client for lease listing)
8988
KubeconfigPath string
9089
}
@@ -129,7 +128,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
129128
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
130129
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
131130
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
132-
flags.StringVar(&o.ServerLeaseSelector, "server-lease-selector", o.ServerLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.")
131+
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
133132
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
134133
return flags
135134
}
@@ -207,11 +206,6 @@ func (o *GrpcProxyAgentOptions) Validate() error {
207206
if err := validateAgentIdentifiers(o.AgentIdentifiers); err != nil {
208207
return fmt.Errorf("agent address is invalid: %v", err)
209208
}
210-
if o.ServerLeaseSelector != "" {
211-
if _, err := labels.Parse(o.ServerLeaseSelector); err != nil {
212-
return fmt.Errorf("invalid server count lease selector: %w", err)
213-
}
214-
}
215209
if o.KubeconfigPath != "" {
216210
if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) {
217211
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
@@ -263,7 +257,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
263257
WarnOnChannelLimit: false,
264258
SyncForever: false,
265259
XfrChannelSize: 150,
266-
ServerLeaseSelector: "",
260+
CountServerLeases: false,
267261
KubeconfigPath: "",
268262
}
269263
return &o

cmd/agent/app/server.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,16 @@ import (
4242
"k8s.io/client-go/kubernetes"
4343
"k8s.io/client-go/tools/clientcmd"
4444
"k8s.io/klog/v2"
45+
"k8s.io/utils/clock"
4546
"sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options"
4647
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
4748
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
4849
)
4950

50-
const ReadHeaderTimeout = 60 * time.Second
51+
const (
52+
ReadHeaderTimeout = 60 * time.Second
53+
LeaseNamespace = "kube-system"
54+
)
5155

5256
func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command {
5357
cmd := &cobra.Command{
@@ -136,7 +140,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
136140
}
137141
cc := o.ClientSetConfig(dialOptions...)
138142

139-
if o.ServerLeaseSelector != "" {
143+
if o.CountServerLeases {
140144
var k8sClient *kubernetes.Clientset
141145
config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath)
142146
if err != nil {
@@ -146,18 +150,13 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
146150
if err != nil {
147151
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
148152
}
149-
serverLeaseSelector, err := labels.Parse(o.ServerLeaseSelector)
150-
if err != nil {
151-
return nil, fmt.Errorf("invalid server count lease selector: %w", err)
152-
}
153+
serverLeaseSelector, _ := labels.Parse("k8s-app=konnectivity-server")
153154
serverLeaseCounter := agent.NewServerLeaseCounter(
155+
clock.RealClock{},
154156
k8sClient,
155157
serverLeaseSelector,
158+
LeaseNamespace,
156159
)
157-
if err != nil {
158-
klog.Errorf("failed to create server lease counter: %v", err)
159-
return nil, fmt.Errorf("failed to create server lease counter: %w", err)
160-
}
161160
cc.ServerLeaseCounter = serverLeaseCounter
162161
}
163162

cmd/server/app/options/options.go

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,7 @@ type ProxyRunOptions struct {
104104
XfrChannelSize int
105105

106106
// Lease controller configuration
107-
LeaseDuration time.Duration
108-
LeaseRenewalInterval time.Duration
109-
LeaseGCInterval time.Duration
110-
LeaseLabelSelector string
111-
LeaseNamespace string
107+
EnableLeaseController bool
112108
}
113109

114110
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -145,11 +141,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
145141
flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick an agent/tunnel, available strategies are: default, destHost, defaultRoute.")
146142
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
147143
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.")
148-
flags.DurationVar(&o.LeaseDuration, "lease-duration", o.LeaseDuration, "The duration of the KNP server lease. Lease system off by default")
149-
flags.DurationVar(&o.LeaseRenewalInterval, "lease-renewal-interval", o.LeaseRenewalInterval, "The interval between KNP server lease renewal calls. Lease system off by default")
150-
flags.DurationVar(&o.LeaseGCInterval, "lease-gc-interval", o.LeaseGCInterval, "The interval between KNP server garbage collection calls. Lease system off by default")
151-
flags.StringVar(&o.LeaseLabelSelector, "lease-label-selector", o.LeaseLabelSelector, "The label selector for KNP server leases. Lease system off by default")
152-
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace to which KNP server leases will be published. Lease system off by default")
144+
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
153145
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
154146
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
155147

@@ -304,25 +296,6 @@ func (o *ProxyRunOptions) Validate() error {
304296
}
305297
}
306298

307-
// Validate leasing parameters. All must be empty or have a value.
308-
if o.IsLeaseCountingEnabled() {
309-
if o.LeaseLabelSelector == "" {
310-
return fmt.Errorf("LeaseLabelSelector cannot be empty when leasing system is enabled")
311-
}
312-
if o.LeaseNamespace == "" {
313-
return fmt.Errorf("LeaseNamespace cannot be empty when leasing system is enabled")
314-
}
315-
if o.LeaseDuration <= 0 {
316-
return fmt.Errorf("LeaseDuration cannot be zero or negative when leasing system is enabled")
317-
}
318-
if o.LeaseGCInterval <= 0 {
319-
return fmt.Errorf("LeaseGCInterval cannot be zero or negative when leasing system is enabled")
320-
}
321-
if o.LeaseRenewalInterval <= 0 {
322-
return fmt.Errorf("LeaseRenewalInterval cannot be zero or negative when leasing system is enabled")
323-
}
324-
}
325-
326299
// validate the proxy strategies
327300
if len(o.ProxyStrategies) == 0 {
328301
return fmt.Errorf("ProxyStrategies cannot be empty")
@@ -381,11 +354,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
381354
ProxyStrategies: "default",
382355
CipherSuites: make([]string, 0),
383356
XfrChannelSize: 10,
384-
LeaseGCInterval: 0,
385-
LeaseDuration: 0,
386-
LeaseRenewalInterval: 0,
387-
LeaseLabelSelector: "",
388-
LeaseNamespace: "",
357+
EnableLeaseController: false,
389358
}
390359
return &o
391360
}
@@ -398,7 +367,3 @@ func defaultServerID() string {
398367
}
399368
return uuid.New().String()
400369
}
401-
402-
func (o *ProxyRunOptions) IsLeaseCountingEnabled() bool {
403-
return o.LeaseLabelSelector != "" || o.LeaseDuration != 0 || o.LeaseGCInterval != 0 || o.LeaseRenewalInterval != 0 || o.LeaseNamespace != ""
404-
}

cmd/server/app/server.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"google.golang.org/grpc"
4040
"google.golang.org/grpc/credentials"
4141
"google.golang.org/grpc/keepalive"
42-
"k8s.io/apimachinery/pkg/labels"
4342
"k8s.io/client-go/kubernetes"
4443
"k8s.io/client-go/tools/clientcmd"
4544
"k8s.io/klog/v2"
@@ -53,7 +52,13 @@ import (
5352

5453
var udsListenerLock sync.Mutex
5554

56-
const ReadHeaderTimeout = 60 * time.Second
55+
const (
56+
ReadHeaderTimeout = 60 * time.Second
57+
LeaseDuration = 30 * time.Second
58+
LeaseRenewalInterval = 15 * time.Second
59+
LeaseGCInterval = 15 * time.Second
60+
LeaseNamespace = "kube-system"
61+
)
5762

5863
func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command {
5964
cmd := &cobra.Command{
@@ -143,23 +148,28 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
143148
defer frontendStop()
144149
}
145150

146-
if o.IsLeaseCountingEnabled() {
147-
leaseLabels, err := labels.ConvertSelectorToLabelsMap(o.LeaseLabelSelector)
148-
if err != nil {
149-
return fmt.Errorf("could not parse lease label selector (%q): %w", o.LeaseLabelSelector, err)
150-
}
151-
leaseController := leases.NewController(k8sClient, o.ServerID, int32(o.LeaseDuration.Seconds()), o.LeaseRenewalInterval, o.LeaseGCInterval, fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID), o.LeaseNamespace, leaseLabels)
152-
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
153-
leaseController.Run(ctx)
154-
}
155-
156151
klog.V(1).Infoln("Starting agent server for tunnel connections.")
157152
err = p.runAgentServer(o, p.server)
158153
if err != nil {
159154
return fmt.Errorf("failed to run the agent server: %v", err)
160155
}
161156
defer p.agentServer.Stop()
162157

158+
if o.EnableLeaseController {
159+
leaseController := leases.NewController(
160+
k8sClient,
161+
o.ServerID,
162+
int32(LeaseDuration.Seconds()),
163+
LeaseRenewalInterval,
164+
LeaseGCInterval,
165+
fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID),
166+
LeaseNamespace,
167+
map[string]string{"k8s-app": "konnectivity-server"},
168+
)
169+
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
170+
leaseController.Run(ctx)
171+
}
172+
163173
klog.V(1).Infoln("Starting admin server for debug connections.")
164174
err = p.runAdminServer(o, p.server)
165175
if err != nil {

pkg/agent/clientset.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ type ClientSet struct {
3939
agentID string // ID of this agent
4040
address string // proxy server address. Assuming HA proxy server
4141

42-
serverCounter *ServerLeaseCounter // counts number of proxy server leases
42+
leaseCounter *ServerLeaseCounter // counts number of proxy server leases
4343
lastReceivedServerCount int // last server count received from a proxy server
44+
lastServerCount int // last server count value from either lease system or proxy server, former takes priority
4445

4546
// unless it is an HA server. Initialized when the ClientSet creates
4647
// the first client. When syncForever is set, it will be the most recently seen.
@@ -165,7 +166,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
165166
drainCh: drainCh,
166167
xfrChannelSize: cc.XfrChannelSize,
167168
stopCh: stopCh,
168-
serverCounter: cc.ServerLeaseCounter,
169+
leaseCounter: cc.ServerLeaseCounter,
169170
}
170171
}
171172

@@ -189,10 +190,9 @@ func (cs *ClientSet) sync(ctx context.Context) {
189190
backoff := cs.resetBackoff()
190191
var duration time.Duration
191192
for {
192-
if err := cs.connectOnce(ctx); err != nil {
193+
if serverCount, err := cs.connectOnce(ctx); err != nil {
193194
if dse, ok := err.(*DuplicateServerError); ok {
194195
clientsCount := cs.ClientsCount()
195-
serverCount := cs.ServerCount(ctx)
196196
klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", serverCount, "clientsCount", clientsCount)
197197
if serverCount != 0 && clientsCount >= serverCount {
198198
duration = backoff.Step()
@@ -219,33 +219,38 @@ func (cs *ClientSet) sync(ctx context.Context) {
219219

220220
func (cs *ClientSet) ServerCount(ctx context.Context) int {
221221
var serverCount int
222-
if cs.serverCounter != nil {
223-
serverCount = cs.serverCounter.Count(ctx)
222+
if cs.leaseCounter != nil {
223+
serverCount = cs.leaseCounter.Count(ctx)
224224
if serverCount == 0 {
225225
klog.Warningf("server lease counter could not find any leases")
226226
}
227227
} else {
228228
serverCount = cs.lastReceivedServerCount
229229
}
230230

231+
if serverCount != cs.lastServerCount {
232+
klog.Warningf("change detected in proxy server count (was: %d, now: %d)", cs.lastServerCount, serverCount)
233+
cs.lastServerCount = serverCount
234+
}
235+
231236
metrics.Metrics.SetServerCount(serverCount)
232237
return serverCount
233238
}
234239

235-
func (cs *ClientSet) connectOnce(ctx context.Context) error {
240+
func (cs *ClientSet) connectOnce(ctx context.Context) (int, error) {
236241
serverCount := cs.ServerCount(ctx)
237242

238243
if !cs.syncForever && serverCount != 0 && cs.ClientsCount() >= serverCount {
239-
return nil
244+
return serverCount, nil
240245
}
241246
c, receivedServerCount, err := cs.newAgentClient()
242247
if err != nil {
243-
return err
248+
return serverCount, err
244249
}
245250
cs.lastReceivedServerCount = receivedServerCount
246251
if err := cs.AddClient(c.serverID, c); err != nil {
247252
c.Close()
248-
return err
253+
return serverCount, err
249254
}
250255
klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID)
251256

@@ -255,7 +260,7 @@ func (cs *ClientSet) connectOnce(ctx context.Context) error {
255260
"serverID", c.serverID,
256261
)
257262
go runpprof.Do(context.Background(), labels, func(context.Context) { c.Serve() })
258-
return nil
263+
return serverCount, nil
259264
}
260265

261266
func (cs *ClientSet) Serve() {

0 commit comments

Comments
 (0)