Skip to content

Commit 18b7b79

Browse files
committed
Add lease lease controller and e2e tests
1 parent a1328d1 commit 18b7b79

File tree

439 files changed

+1009
-41180
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

439 files changed

+1009
-41180
lines changed

cmd/server/app/options/options.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ type ProxyRunOptions struct {
102102
// see: https://pkg.go.dev/crypto/tls#Config, so in that case, this option won't have any effect.
103103
CipherSuites []string
104104
XfrChannelSize int
105+
106+
// Lease controller configuration
107+
LeaseDuration time.Duration
108+
LeaseRenewalInterval time.Duration
109+
LeaseGCInterval time.Duration
110+
LeaseLabelSelector string
111+
LeaseNamespace string
105112
}
106113

107114
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -138,7 +145,11 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
138145
flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick an agent/tunnel, available strategies are: default, destHost, defaultRoute.")
139146
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
140147
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.")
141-
148+
flags.DurationVar(&o.LeaseDuration, "lease-duration", o.LeaseDuration, "The duration of the KNP server lease. Lease system off by default")
149+
flags.DurationVar(&o.LeaseRenewalInterval, "lease-renewal-interval", o.LeaseRenewalInterval, "The interval between KNP server lease renewal calls. Lease system off by default")
150+
flags.DurationVar(&o.LeaseGCInterval, "lease-gc-interval", o.LeaseGCInterval, "The interval between KNP server garbage collection calls. Lease system off by default")
151+
flags.StringVar(&o.LeaseLabelSelector, "lease-label-selector", o.LeaseLabelSelector, "The label selector for KNP server leases. Lease system off by default")
152+
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace to which KNP server leases will be published. Lease system off by default")
142153
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
143154
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
144155

@@ -293,6 +304,25 @@ func (o *ProxyRunOptions) Validate() error {
293304
}
294305
}
295306

307+
// Validate leasing parameters. All must be empty or have a value.
308+
if o.LeaseLabelSelector != "" || o.LeaseDuration != 0 || o.LeaseGCInterval != 0 || o.LeaseRenewalInterval != 0 || o.LeaseNamespace != "" {
309+
if o.LeaseLabelSelector == "" {
310+
return fmt.Errorf("LeaseLabelSelector cannot be empty when leasing system is enabled")
311+
}
312+
if o.LeaseNamespace == "" {
313+
return fmt.Errorf("LeaseNamespace cannot be empty when leasing system is enabled")
314+
}
315+
if o.LeaseDuration == 0 {
316+
return fmt.Errorf("LeaseDuration cannot be zero when leasing system is enabled")
317+
}
318+
if o.LeaseGCInterval == 0 {
319+
return fmt.Errorf("LeaseGCInterval cannot be zero when leasing system is enabled")
320+
}
321+
if o.LeaseRenewalInterval == 0 {
322+
return fmt.Errorf("LeaseRenewalInterval cannot be zero when leasing system is enabled")
323+
}
324+
}
325+
296326
// validate the proxy strategies
297327
if len(o.ProxyStrategies) == 0 {
298328
return fmt.Errorf("ProxyStrategies cannot be empty")
@@ -351,6 +381,11 @@ func NewProxyRunOptions() *ProxyRunOptions {
351381
ProxyStrategies: "default",
352382
CipherSuites: make([]string, 0),
353383
XfrChannelSize: 10,
384+
LeaseGCInterval: 0,
385+
LeaseDuration: 0,
386+
LeaseRenewalInterval: 0,
387+
LeaseLabelSelector: "",
388+
LeaseNamespace: "",
354389
}
355390
return &o
356391
}

cmd/server/app/server.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ import (
3939
"google.golang.org/grpc"
4040
"google.golang.org/grpc/credentials"
4141
"google.golang.org/grpc/keepalive"
42+
"k8s.io/apimachinery/pkg/labels"
4243
"k8s.io/client-go/kubernetes"
4344
"k8s.io/client-go/tools/clientcmd"
4445
"k8s.io/klog/v2"
45-
4646
"sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options"
4747
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
4848
"sigs.k8s.io/apiserver-network-proxy/pkg/server"
49+
"sigs.k8s.io/apiserver-network-proxy/pkg/server/leases"
4950
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
5051
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
5152
)
@@ -142,6 +143,16 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
142143
defer frontendStop()
143144
}
144145

146+
if o.LeaseLabelSelector != "" || o.LeaseDuration != 0 || o.LeaseGCInterval != 0 || o.LeaseRenewalInterval != 0 {
147+
leaseLabels, err := labels.ConvertSelectorToLabelsMap(o.LeaseLabelSelector)
148+
if err != nil {
149+
return fmt.Errorf("could not parse lease label selector: %w", err)
150+
}
151+
leaseController := leases.NewController(k8sClient, o.ServerID, int32(o.LeaseDuration.Seconds()), o.LeaseRenewalInterval, o.LeaseGCInterval, fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID), o.LeaseNamespace, leaseLabels)
152+
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
153+
leaseController.Run(ctx)
154+
}
155+
145156
klog.V(1).Infoln("Starting agent server for tunnel connections.")
146157
err = p.runAgentServer(o, p.server)
147158
if err != nil {

e2e/lease_count_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,140 @@ func TestLeaseCount(t *testing.T) {
188188
feature.Setup(lc.DeleteValidLease())
189189
feature.Assess("agents correctly count 3 leases (3 valid, 3 expired)", assertAgentKnownServerCount(3, adminPort))
190190
}
191+
192+
func TestSingleServer_SingleAgent_LeaseCount(t *testing.T) {
193+
adminPort := 8093
194+
195+
serverDeploymentConfig := DeploymentConfig{
196+
Replicas: 1,
197+
Image: *serverImage,
198+
Args: []KeyValue{
199+
{"log-file", "/var/log/konnectivity-server.log"},
200+
{"logtostderr", "true"},
201+
{"log-file-max-size", "0"},
202+
{"uds-name", "/etc/kubernetes/konnectivity-server/konnectivity-server.socket"},
203+
{Key: "delete-existing-uds-file"},
204+
{"cluster-cert", "/etc/kubernetes/pki/apiserver.crt"},
205+
{"cluster-key", "/etc/kubernetes/pki/apiserver.key"},
206+
{"server-port", "8090"},
207+
{"agent-port", "8091"},
208+
{"health-port", "8092"},
209+
{"admin-port", strconv.Itoa(adminPort)},
210+
{"keepalive-time", "1h"},
211+
{"mode", "grpc"},
212+
{"agent-namespace", "kube-system"},
213+
{"agent-service-account", "konnectivity-agent"},
214+
{"kubeconfig", "/etc/kubernetes/admin.conf"},
215+
{"authentication-audience", "system:konnectivity-server"},
216+
{"lease-gc-interval", "10s"},
217+
{"lease-renewal-interval", "5s"},
218+
{"lease-label-selector", "k8s-app=konnectivity-server"},
219+
{"lease-namespace", "kube-system"},
220+
},
221+
}
222+
serverDeployment, _, err := renderTemplate("server/deployment.yaml", serverDeploymentConfig)
223+
if err != nil {
224+
t.Fatalf("could not render server deployment: %v", err)
225+
}
226+
227+
agentDeploymentConfig := DeploymentConfig{
228+
Replicas: 1,
229+
Image: *agentImage,
230+
Args: []KeyValue{
231+
{"logtostderr", "true"},
232+
{"ca-cert", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"},
233+
{"proxy-server-host", "konnectivity-server.kube-system.svc.cluster.local"},
234+
{"proxy-server-port", "8091"},
235+
{"sync-interval", "1s"},
236+
{"sync-interval-cap", "10s"},
237+
{Key: "sync-forever"},
238+
{"probe-interval", "1s"},
239+
{"service-account-token-path", "/var/run/secrets/tokens/konnectivity-agent-token"},
240+
{"server-lease-selector", "k8s-app=konnectivity-server"},
241+
},
242+
}
243+
agentDeployment, _, err := renderTemplate("agent/deployment.yaml", agentDeploymentConfig)
244+
if err != nil {
245+
t.Fatalf("could not render agent deployment: %v", err)
246+
}
247+
248+
feature := features.New("konnectivity server and agent deployment with single replica for each")
249+
feature.Setup(deployAndWaitForDeployment(serverDeployment))
250+
feature.Setup(deployAndWaitForDeployment(agentDeployment))
251+
feature.Assess("konnectivity server has a connected client", assertServersAreConnected(1, adminPort))
252+
feature.Assess("konnectivity agent is connected to a server", assertAgentsAreConnected(1, adminPort))
253+
}
254+
255+
func TestMultiServer_MultiAgent_LeaseCount(t *testing.T) {
256+
adminPort := 8093
257+
initialAgentReplicas := 2
258+
initialServerReplicas := 2
259+
260+
serverDeploymentConfig := DeploymentConfig{
261+
Replicas: initialServerReplicas,
262+
Image: *serverImage,
263+
Args: []KeyValue{
264+
{"log-file", "/var/log/konnectivity-server.log"},
265+
{"logtostderr", "true"},
266+
{"log-file-max-size", "0"},
267+
{"uds-name", "/etc/kubernetes/konnectivity-server/konnectivity-server.socket"},
268+
{Key: "delete-existing-uds-file"},
269+
{"cluster-cert", "/etc/kubernetes/pki/apiserver.crt"},
270+
{"cluster-key", "/etc/kubernetes/pki/apiserver.key"},
271+
{"server-port", "8090"},
272+
{"agent-port", "8091"},
273+
{"health-port", "8092"},
274+
{"admin-port", strconv.Itoa(adminPort)},
275+
{"keepalive-time", "1h"},
276+
{"mode", *connectionMode},
277+
{"agent-namespace", "kube-system"},
278+
{"agent-service-account", "konnectivity-agent"},
279+
{"kubeconfig", "/etc/kubernetes/admin.conf"},
280+
{"authentication-audience", "system:konnectivity-server"},
281+
{"server-count", strconv.Itoa(initialServerReplicas)},
282+
{"lease-gc-interval", "10s"},
283+
{"lease-renewal-interval", "5s"},
284+
{"lease-label-selector", "k8s-app=konnectivity-server"},
285+
{"lease-namespace", "kube-system"},
286+
},
287+
}
288+
serverDeployment, _, err := renderTemplate("server/deployment.yaml", serverDeploymentConfig)
289+
if err != nil {
290+
t.Fatalf("could not render server deployment: %v", err)
291+
}
292+
293+
agentDeploymentConfig := DeploymentConfig{
294+
Replicas: initialAgentReplicas,
295+
Image: *agentImage,
296+
Args: []KeyValue{
297+
{"logtostderr", "true"},
298+
{"ca-cert", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"},
299+
{"proxy-server-host", "konnectivity-server.kube-system.svc.cluster.local"},
300+
{"proxy-server-port", "8091"},
301+
{"sync-interval", "1s"},
302+
{"sync-interval-cap", "10s"},
303+
{Key: "sync-forever"},
304+
{"probe-interval", "1s"},
305+
{"service-account-token-path", "/var/run/secrets/tokens/konnectivity-agent-token"},
306+
{"server-lease-selector", "k8s-app=konnectivity-server"},
307+
},
308+
}
309+
agentDeployment, _, err := renderTemplate("agent/deployment.yaml", agentDeploymentConfig)
310+
if err != nil {
311+
t.Fatalf("could not render agent deployment: %v", err)
312+
}
313+
314+
feature := features.New("konnectivity server and agent deployment with single replica for each")
315+
feature.Setup(deployAndWaitForDeployment(serverDeployment))
316+
feature.Setup(deployAndWaitForDeployment(agentDeployment))
317+
feature.Assess("all servers connected to all clients", assertServersAreConnected(initialAgentReplicas, adminPort))
318+
feature.Assess("all agents connected to all servers", assertAgentsAreConnected(initialServerReplicas, adminPort))
319+
feature.Setup(scaleDeployment(serverDeployment, 4))
320+
feature.Setup(scaleDeployment(agentDeployment, 4))
321+
feature.Assess("all servers connected to all clients after scale up", assertServersAreConnected(4, adminPort))
322+
feature.Assess("all agents connected to all servers after scale up", assertAgentsAreConnected(4, adminPort))
323+
feature.Setup(scaleDeployment(serverDeployment, 3))
324+
feature.Setup(scaleDeployment(agentDeployment, 3))
325+
feature.Assess("all servers connected to all clients after scale down", assertServersAreConnected(3, adminPort))
326+
feature.Assess("all agents connected to all servers after scale down", assertAgentsAreConnected(3, adminPort))
327+
}

e2e/main_test.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"text/template"
1313
"time"
1414

15+
appsv1api "k8s.io/api/apps/v1"
1516
"k8s.io/apimachinery/pkg/runtime/schema"
1617
"k8s.io/client-go/kubernetes/scheme"
1718
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -156,14 +157,43 @@ func renderAndApplyManifests(ctx context.Context, cfg *envconf.Config) (context.
156157
return ctx, nil
157158
}
158159

159-
func deployAndWaitForDeployment(deployment client.Object) func(context.Context, *testing.T, *envconf.Config) context.Context {
160+
func deployAndWaitForDeployment(obj client.Object) func(context.Context, *testing.T, *envconf.Config) context.Context {
160161
return func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
161162
client := cfg.Client()
162-
err := client.Resources().Create(ctx, deployment)
163+
err := client.Resources().Create(ctx, obj)
163164
if err != nil {
164165
t.Fatalf("could not create Deployment: %v", err)
165166
}
166167

168+
err = wait.For(
169+
conditions.New(client.Resources()).DeploymentAvailable(obj.GetName(), obj.GetNamespace()),
170+
wait.WithTimeout(1*time.Minute),
171+
wait.WithInterval(10*time.Second),
172+
)
173+
if err != nil {
174+
t.Fatalf("waiting for Deployment failed: %v", err)
175+
}
176+
177+
return ctx
178+
}
179+
}
180+
181+
func scaleDeployment(obj client.Object, replicas int) func(context.Context, *testing.T, *envconf.Config) context.Context {
182+
return func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
183+
deployment, ok := obj.(*appsv1api.Deployment)
184+
if !ok {
185+
t.Fatalf("provided object is not a deployment")
186+
}
187+
188+
newReplicas := int32(replicas)
189+
deployment.Spec.Replicas = &newReplicas
190+
191+
client := cfg.Client()
192+
err := client.Resources().Update(ctx, deployment)
193+
if err != nil {
194+
t.Fatalf("could not update Deployment replicas: %v", err)
195+
}
196+
167197
err = wait.For(
168198
conditions.New(client.Resources()).DeploymentAvailable(deployment.GetName(), deployment.GetNamespace()),
169199
wait.WithTimeout(1*time.Minute),

go.mod

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ require (
1616
golang.org/x/net v0.26.0
1717
google.golang.org/grpc v1.64.0
1818
google.golang.org/protobuf v1.34.2
19-
k8s.io/api v0.30.2
20-
k8s.io/apimachinery v0.30.2
21-
k8s.io/client-go v0.30.2
19+
k8s.io/api v0.30.3
20+
k8s.io/apimachinery v0.30.3
21+
k8s.io/client-go v0.30.3
2222
k8s.io/component-base v0.30.2
23+
k8s.io/component-helpers v0.30.3
2324
k8s.io/klog/v2 v2.130.0
24-
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0
25+
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.0
2526
sigs.k8s.io/controller-runtime v0.18.2
2627
sigs.k8s.io/e2e-framework v0.4.0
2728
)
@@ -41,7 +42,6 @@ require (
4142
github.com/gogo/protobuf v1.3.2 // indirect
4243
github.com/golang/protobuf v1.5.4 // indirect
4344
github.com/google/gnostic-models v0.6.8 // indirect
44-
github.com/google/go-cmp v0.6.0 // indirect
4545
github.com/google/gofuzz v1.2.0 // indirect
4646
github.com/gorilla/websocket v1.5.0 // indirect
4747
github.com/imdario/mergo v0.3.15 // indirect

go.sum

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,16 +180,18 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
180180
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
181181
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
182182
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
183-
k8s.io/api v0.30.2 h1:+ZhRj+28QT4UOH+BKznu4CBgPWgkXO7XAvMcMl0qKvI=
184-
k8s.io/api v0.30.2/go.mod h1:ULg5g9JvOev2dG0u2hig4Z7tQ2hHIuS+m8MNZ+X6EmI=
183+
k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ=
184+
k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04=
185185
k8s.io/apiextensions-apiserver v0.30.0 h1:jcZFKMqnICJfRxTgnC4E+Hpcq8UEhT8B2lhBcQ+6uAs=
186186
k8s.io/apiextensions-apiserver v0.30.0/go.mod h1:N9ogQFGcrbWqAY9p2mUAL5mGxsLqwgtUce127VtRX5Y=
187-
k8s.io/apimachinery v0.30.2 h1:fEMcnBj6qkzzPGSVsAZtQThU62SmQ4ZymlXRC5yFSCg=
188-
k8s.io/apimachinery v0.30.2/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
189-
k8s.io/client-go v0.30.2 h1:sBIVJdojUNPDU/jObC+18tXWcTJVcwyqS9diGdWHk50=
190-
k8s.io/client-go v0.30.2/go.mod h1:JglKSWULm9xlJLx4KCkfLLQ7XwtlbflV6uFFSHTMgVs=
187+
k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc=
188+
k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
189+
k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k=
190+
k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U=
191191
k8s.io/component-base v0.30.2 h1:pqGBczYoW1sno8q9ObExUqrYSKhtE5rW3y6gX88GZII=
192192
k8s.io/component-base v0.30.2/go.mod h1:yQLkQDrkK8J6NtP+MGJOws+/PPeEXNpwFixsUI7h/OE=
193+
k8s.io/component-helpers v0.30.3 h1:KPc8l0eGx9Wg2OcKc58k9ozNcVcOInAi3NGiuS2xJ/c=
194+
k8s.io/component-helpers v0.30.3/go.mod h1:VOQ7g3q+YbKWwKeACG2BwPv4ftaN8jXYJ5U3xpzuYAE=
193195
k8s.io/klog/v2 v2.130.0 h1:5nB3+3HpqKqXJIXNtJdtxcDCfaa9KL8StJgMzGJkUkM=
194196
k8s.io/klog/v2 v2.130.0/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
195197
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=

0 commit comments

Comments
 (0)