Skip to content

Commit 707e0c9

Browse files
authored
Merge pull request #643 from carreter/server-counter-e2e
Lease-based server counting logic for agent and lease controller for server
2 parents 8aac94a + 714d092 commit 707e0c9

File tree

95 files changed

+15627
-233
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+15627
-233
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ DOCKER_CMD ?= docker
5555
DOCKER_CLI_EXPERIMENTAL ?= enabled
5656
PROXY_SERVER_IP ?= 127.0.0.1
5757

58-
KIND_IMAGE ?= kindest/node
58+
KIND_IMAGE ?= kindest/node:v1.30.2
5959
CONNECTION_MODE ?= grpc
6060
## --------------------------------------
6161
## Testing

cmd/agent/app/options/options.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ type GrpcProxyAgentOptions struct {
8080

8181
SyncForever bool
8282
XfrChannelSize int
83+
84+
// Enables updating the server count by counting the number of valid leases
85+
// matching the selector.
86+
CountServerLeases bool
87+
// Path to kubeconfig (used by kubernetes client for lease listing)
88+
KubeconfigPath string
8389
}
8490

8591
func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
@@ -122,6 +128,8 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
122128
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
123129
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
124130
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
131+
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
132+
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
125133
return flags
126134
}
127135

@@ -198,6 +206,12 @@ func (o *GrpcProxyAgentOptions) Validate() error {
198206
if err := validateAgentIdentifiers(o.AgentIdentifiers); err != nil {
199207
return fmt.Errorf("agent address is invalid: %v", err)
200208
}
209+
if o.KubeconfigPath != "" {
210+
if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) {
211+
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
212+
}
213+
}
214+
201215
return nil
202216
}
203217

@@ -243,6 +257,8 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
243257
WarnOnChannelLimit: false,
244258
SyncForever: false,
245259
XfrChannelSize: 150,
260+
CountServerLeases: false,
261+
KubeconfigPath: "",
246262
}
247263
return &o
248264
}

cmd/agent/app/server.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,24 @@ import (
3838
"google.golang.org/grpc"
3939
"google.golang.org/grpc/credentials"
4040
"google.golang.org/grpc/keepalive"
41+
"k8s.io/apimachinery/pkg/labels"
42+
"k8s.io/client-go/kubernetes"
43+
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
44+
"k8s.io/client-go/rest"
45+
"k8s.io/client-go/tools/cache"
46+
"k8s.io/client-go/tools/clientcmd"
4147
"k8s.io/klog/v2"
42-
48+
"k8s.io/utils/clock"
4349
"sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options"
4450
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
4551
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
4652
)
4753

48-
const ReadHeaderTimeout = 60 * time.Second
54+
const (
55+
ReadHeaderTimeout = 60 * time.Second
56+
LeaseNamespace = "kube-system"
57+
LeaseInformerResync = time.Second * 10
58+
)
4959

5060
func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command {
5161
cmd := &cobra.Command{
@@ -133,6 +143,38 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
133143
}),
134144
}
135145
cc := o.ClientSetConfig(dialOptions...)
146+
147+
if o.CountServerLeases {
148+
var config *rest.Config
149+
if o.KubeconfigPath != "" {
150+
config, err = clientcmd.BuildConfigFromFlags("", o.KubeconfigPath)
151+
if err != nil {
152+
return nil, fmt.Errorf("failed to load kubernetes client config: %v", err)
153+
}
154+
} else {
155+
config, err = rest.InClusterConfig()
156+
if err != nil {
157+
return nil, fmt.Errorf("failed to load in cluster kubernetes client config: %w", err)
158+
}
159+
}
160+
161+
k8sClient, err := kubernetes.NewForConfig(config)
162+
if err != nil {
163+
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
164+
}
165+
leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, LeaseNamespace, LeaseInformerResync)
166+
go leaseInformer.Run(stopCh)
167+
cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced)
168+
leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer())
169+
serverLeaseSelector, _ := labels.Parse("k8s-app=konnectivity-server")
170+
serverLeaseCounter := agent.NewServerLeaseCounter(
171+
clock.RealClock{},
172+
leaseLister,
173+
serverLeaseSelector,
174+
)
175+
cc.ServerLeaseCounter = serverLeaseCounter
176+
}
177+
136178
cs := cc.NewAgentClientSet(drainCh, stopCh)
137179
cs.Serve()
138180

cmd/server/app/options/options.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ type ProxyRunOptions struct {
102102
// see: https://pkg.go.dev/crypto/tls#Config, so in that case, this option won't have any effect.
103103
CipherSuites []string
104104
XfrChannelSize int
105+
106+
// Lease controller configuration
107+
EnableLeaseController bool
105108
}
106109

107110
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -138,7 +141,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
138141
flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick an agent/tunnel, available strategies are: default, destHost, defaultRoute.")
139142
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
140143
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.")
141-
144+
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
142145
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
143146
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
144147

@@ -351,6 +354,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
351354
ProxyStrategies: "default",
352355
CipherSuites: make([]string, 0),
353356
XfrChannelSize: 10,
357+
EnableLeaseController: false,
354358
}
355359
return &o
356360
}

cmd/server/app/server.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,23 @@ import (
4242
"k8s.io/client-go/kubernetes"
4343
"k8s.io/client-go/tools/clientcmd"
4444
"k8s.io/klog/v2"
45-
4645
"sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options"
4746
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
4847
"sigs.k8s.io/apiserver-network-proxy/pkg/server"
48+
"sigs.k8s.io/apiserver-network-proxy/pkg/server/leases"
4949
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
5050
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
5151
)
5252

5353
var udsListenerLock sync.Mutex
5454

55-
const ReadHeaderTimeout = 60 * time.Second
55+
const (
56+
ReadHeaderTimeout = 60 * time.Second
57+
LeaseDuration = 30 * time.Second
58+
LeaseRenewalInterval = 15 * time.Second
59+
LeaseGCInterval = 15 * time.Second
60+
LeaseNamespace = "kube-system"
61+
)
5662

5763
func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command {
5864
cmd := &cobra.Command{
@@ -149,6 +155,22 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
149155
}
150156
defer p.agentServer.Stop()
151157

158+
if o.EnableLeaseController {
159+
leaseController := leases.NewController(
160+
k8sClient,
161+
o.ServerID,
162+
int32(LeaseDuration.Seconds()),
163+
LeaseRenewalInterval,
164+
LeaseGCInterval,
165+
fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID),
166+
LeaseNamespace,
167+
map[string]string{"k8s-app": "konnectivity-server"},
168+
)
169+
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
170+
leaseController.Run(ctx)
171+
defer leaseController.Stop()
172+
}
173+
152174
klog.V(1).Infoln("Starting admin server for debug connections.")
153175
err = p.runAdminServer(o, p.server)
154176
if err != nil {

e2e/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ These can be run automatically using `make e2e-test`.
1010
Before any of the actual tests are run, the `TestMain()` function
1111
in `main_test.go` performs the following set up steps:
1212

13-
- Spin up a new kind cluster with the node image provided by the `-kind-image` flag.
13+
- Spin up a new kind cluster (4 control plane and 4 worker nodes) with the node image provided by the `-kind-image` flag.
1414
- Sideload the KNP agent and server images provided with `-agent-image` and `-server-image` into the cluster.
1515
- Deploy the necessary RBAC and service templates for both the KNP agent and server (see `renderAndApplyManifests`).
1616

@@ -21,3 +21,9 @@ in `main_test.go` performs the following set up steps:
2121
These tests deploy the KNP servers and agents to the previously created kind cluster.
2222
After the deployments are up, the tests check that both the agent and server report
2323
the correct number of connections on their metrics endpoints.
24+
25+
### `lease_count_test.go`
26+
27+
Similar to `static_count_test.go`, except using the new lease-based server counting
28+
system rather than passing the server count to the KNP server deployment as a CLI
29+
flag.

e2e/lease_count_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package e2e
17+
18+
import (
19+
"fmt"
20+
"testing"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/client"
23+
"sigs.k8s.io/e2e-framework/pkg/features"
24+
)
25+
26+
func renderLeaseCountDeployments(serverReplicas, agentReplicas int) (serverDeployment client.Object, agentDeployment client.Object, err error) {
27+
serverDeploymentConfig := DeploymentConfig{
28+
Replicas: serverReplicas,
29+
Image: *serverImage,
30+
Args: []CLIFlag{
31+
{Flag: "log-file", Value: "/var/log/konnectivity-server.log"},
32+
{Flag: "logtostderr", Value: "true"},
33+
{Flag: "log-file-max-size", Value: "0"},
34+
{Flag: "uds-name", Value: "/etc/kubernetes/konnectivity-server/konnectivity-server.socket"},
35+
{Flag: "delete-existing-uds-file"},
36+
{Flag: "cluster-cert", Value: "/etc/kubernetes/pki/apiserver.crt"},
37+
{Flag: "cluster-key", Value: "/etc/kubernetes/pki/apiserver.key"},
38+
{Flag: "server-port", Value: "0"},
39+
{Flag: "kubeconfig", Value: "/etc/kubernetes/admin.conf"},
40+
{Flag: "keepalive-time", Value: "1h"},
41+
{Flag: "mode", Value: "grpc"},
42+
{Flag: "agent-namespace", Value: "kube-system"},
43+
{Flag: "agent-service-account", Value: "konnectivity-agent"},
44+
{Flag: "authentication-audience", Value: "system:konnectivity-server"},
45+
{Flag: "enable-lease-controller"},
46+
{Flag: "admin-bind-address", EmptyValue: true},
47+
{Flag: "mode", Value: *connectionMode},
48+
},
49+
}
50+
serverDeployment, _, err = renderTemplate("server/deployment.yaml", serverDeploymentConfig)
51+
if err != nil {
52+
return nil, nil, fmt.Errorf("could not render server deployment: %w", err)
53+
}
54+
55+
agentDeploymentConfig := DeploymentConfig{
56+
Replicas: agentReplicas,
57+
Image: *agentImage,
58+
Args: []CLIFlag{
59+
{Flag: "logtostderr", Value: "true"},
60+
{Flag: "ca-cert", Value: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"},
61+
{Flag: "proxy-server-host", Value: "konnectivity-server.kube-system.svc.cluster.local"},
62+
{Flag: "proxy-server-port", Value: "8091"},
63+
{Flag: "sync-interval", Value: "1s"},
64+
{Flag: "sync-interval-cap", Value: "10s"},
65+
{Flag: "sync-forever"},
66+
{Flag: "probe-interval", Value: "1s"},
67+
{Flag: "service-account-token-path", Value: "/var/run/secrets/tokens/konnectivity-agent-token"},
68+
{Flag: "count-server-leases"},
69+
{Flag: "agent-identifiers", Value: "ipv4=${HOST_IP}"},
70+
{Flag: "admin-bind-address", EmptyValue: true},
71+
},
72+
}
73+
agentDeployment, _, err = renderTemplate("agent/deployment.yaml", agentDeploymentConfig)
74+
if err != nil {
75+
return nil, nil, fmt.Errorf("could not render agent deployment: %w", err)
76+
}
77+
78+
return serverDeployment, agentDeployment, nil
79+
}
80+
81+
func TestSingleServer_SingleAgent_LeaseCount(t *testing.T) {
82+
serverDeployment, agentDeployment, err := renderLeaseCountDeployments(1, 1)
83+
if err != nil {
84+
t.Fatalf("could not render lease count deployments: %v", err)
85+
}
86+
87+
feature := features.New("konnectivity server and agent deployment with single replica for each")
88+
feature = feature.Setup(createDeployment(agentDeployment))
89+
feature = feature.Setup(createDeployment(serverDeployment))
90+
feature = feature.Setup(waitForDeployment(serverDeployment))
91+
feature = feature.Setup(waitForDeployment(agentDeployment))
92+
feature = feature.Assess("konnectivity server has a connected client", assertServersAreConnected(1))
93+
feature = feature.Assess("konnectivity agent is connected to a server", assertAgentsAreConnected(1))
94+
feature = feature.Assess("agent correctly counts 1 lease", assertAgentKnownServerCount(1))
95+
feature = feature.Teardown(deleteDeployment(agentDeployment))
96+
feature = feature.Teardown(deleteDeployment(serverDeployment))
97+
98+
testenv.Test(t, feature.Feature())
99+
}
100+
101+
func TestMultiServer_MultiAgent_LeaseCount(t *testing.T) {
102+
serverDeployment, agentDeployment, err := renderLeaseCountDeployments(2, 2)
103+
if err != nil {
104+
t.Fatalf("could not render lease count deployments: %v", err)
105+
}
106+
107+
feature := features.New("konnectivity server and agent deployment with multiple replicas")
108+
feature = feature.Setup(createDeployment(serverDeployment))
109+
feature = feature.Setup(createDeployment(agentDeployment))
110+
feature = feature.Setup(waitForDeployment(serverDeployment))
111+
feature = feature.Setup(waitForDeployment(agentDeployment))
112+
feature = feature.Setup(scaleDeployment(serverDeployment, 4))
113+
feature = feature.Setup(scaleDeployment(agentDeployment, 4))
114+
feature = feature.Setup(waitForDeployment(agentDeployment))
115+
feature = feature.Setup(waitForDeployment(serverDeployment))
116+
feature = feature.Assess("all servers connected to all clients after scale up", assertServersAreConnected(4))
117+
feature = feature.Assess("all agents connected to all servers after scale up", assertAgentsAreConnected(4))
118+
feature = feature.Assess("agents correctly count 4 leases after scale up", assertAgentKnownServerCount(4))
119+
feature = feature.Teardown(deleteDeployment(agentDeployment))
120+
feature = feature.Teardown(deleteDeployment(serverDeployment))
121+
122+
testenv.Test(t, feature.Feature())
123+
}

0 commit comments

Comments
 (0)