Skip to content

Commit 7e2a08a

Browse files
committed
Fix SSH connection leak
Adds Close() methods to NodeConfig and Windows interfaces to allow callers to ensure that SSH connections to the VM are closed.
1 parent 9b2b0d0 commit 7e2a08a

File tree

10 files changed

+86
-30
lines changed

10 files changed

+86
-30
lines changed

controllers/configmap_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,12 @@ func (r *ConfigMapReconciler) ensureTrustedCABundleInNode(ctx context.Context, n
545545
if err != nil {
546546
return fmt.Errorf("failed to create new nodeconfig: %w", err)
547547
}
548+
defer func() {
549+
err := nc.Close()
550+
if err != nil {
551+
r.log.Info("WARNING: error closing nodeconfig", "error", err.Error())
552+
}
553+
}()
548554
return nc.SyncTrustedCABundle(ctx)
549555
}
550556

controllers/controllers.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func (r *instanceReconciler) ensureInstanceIsUpToDate(ctx context.Context, insta
7777
if err != nil {
7878
return fmt.Errorf("failed to create new nodeconfig: %w", err)
7979
}
80+
defer r.nodeConfigCleanup(nc)
8081

8182
// Check if the instance was configured by a previous version of WMCO and must be deconfigured before being
8283
// configured again.
@@ -132,6 +133,7 @@ func (r *instanceReconciler) updateKubeletCA(ctx context.Context, node core.Node
132133
if err != nil {
133134
return fmt.Errorf("error creating nodeConfig for instance %s: %w", winInstance.Address, err)
134135
}
136+
defer r.nodeConfigCleanup(nodeConfig)
135137
r.log.Info("updating kubelet CA client certificates in", "node", node.Name)
136138
return nodeConfig.UpdateKubeletClientCA(contents)
137139
}
@@ -163,6 +165,7 @@ func (r *instanceReconciler) deconfigureInstance(ctx context.Context, node *core
163165
if err != nil {
164166
return fmt.Errorf("failed to create new nodeconfig: %w", err)
165167
}
168+
defer r.nodeConfigCleanup(nc)
166169

167170
if err = nc.Deconfigure(ctx); err != nil {
168171
return err
@@ -173,6 +176,13 @@ func (r *instanceReconciler) deconfigureInstance(ctx context.Context, node *core
173176
return nil
174177
}
175178

179+
func (r *instanceReconciler) nodeConfigCleanup(nc *nodeconfig.NodeConfig) {
180+
err := nc.Close()
181+
if err != nil {
182+
r.log.Info("WARNING: error closing nodeconfig", "error", err.Error())
183+
}
184+
}
185+
176186
// windowsNodeVersionChangePredicate returns a predicate whose filter catches Windows nodes that indicate a version
177187
// change either through deletion away from an old version or creation/update to the latest WMCO version
178188
func windowsNodeVersionChangePredicate() predicate.Funcs {

controllers/node_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ func (r *nodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resul
109109
if err != nil {
110110
return ctrl.Result{}, fmt.Errorf("failed to create new nodeconfig: %w", err)
111111
}
112+
defer func() {
113+
err := nc.Close()
114+
if err != nil {
115+
r.log.Info("WARNING: error closing nodeconfig", "error", err.Error())
116+
}
117+
}()
112118

113119
if err := nc.SafeReboot(ctx); err != nil {
114120
return ctrl.Result{}, fmt.Errorf("full instance reboot failed: %w", err)

controllers/registry_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ func (r *registryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
115115
if err != nil {
116116
return ctrl.Result{}, fmt.Errorf("failed to create new nodeconfig: %w", err)
117117
}
118+
defer func() {
119+
err := nc.Close()
120+
if err != nil {
121+
r.log.Info("WARNING: error closing nodeconfig", "error", err.Error())
122+
}
123+
}()
118124

119125
r.log.Info("updating containerd config", "directory", windows.ContainerdConfigDir, "node", node.Name)
120126
// TODO: If this flakes for any one node, we have to loop over all nodes again and re-transfer the directory to

controllers/secret_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,12 @@ func (r *SecretReconciler) reconcileTLSSecret(ctx context.Context) error {
255255
if err != nil {
256256
return fmt.Errorf("failed to create new nodeconfig: %w", err)
257257
}
258+
defer func() {
259+
err := nc.Close()
260+
if err != nil {
261+
r.log.Info("WARNING: error closing nodeconfig", "error", err.Error())
262+
}
263+
}()
258264
if err = nc.Windows.ReplaceDir(certFiles, windows.TLSCertsPath); err != nil {
259265
return fmt.Errorf("unable to transfer TLS certs: %w", err)
260266
}

pkg/csr/csr.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ func findHostName(instanceInfo *instance.Info, instanceSigner ssh.Signer) (strin
323323
if err != nil {
324324
return "", fmt.Errorf("error instantiating Windows instance: %w", err)
325325
}
326+
defer win.Close()
326327
// get the instance host name by running hostname command on remote VM
327328
return win.GetHostname()
328329
}

pkg/nodeconfig/init.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,20 @@ import (
1010
crclientcfg "sigs.k8s.io/controller-runtime/pkg/client/config"
1111
)
1212

13-
// cache holds the information of the nodeConfig that is invariant for multiple reconciliation cycles. We'll use this
13+
// cache holds the information of the NodeConfig that is invariant for multiple reconciliation cycles. We'll use this
1414
// information when we don't want to get the information from the global context coming from reconciler
15-
// but to have something at nodeConfig package locally which will be passed onto other structs. There is no need to
15+
// but to have something at NodeConfig package locally which will be passed onto other structs. There is no need to
1616
// invalidate this cache as of now, since if someone wants to change any of the fields, they've to restart the operator
1717
// which will invalidate the cache automatically.
1818
type cache struct {
1919
// apiServerEndpoint is the address which clients can interact with the API server through
2020
apiServerEndpoint string
2121
}
2222

23-
// cache has the information related to nodeConfig that should not be changed.
23+
// cache has the information related to NodeConfig that should not be changed.
2424
var nodeConfigCache = cache{}
2525

26-
// init populates the cache that we need for nodeConfig
26+
// init populates the cache that we need for NodeConfig
2727
func init() {
2828
var kubeAPIServerEndpoint string
2929
log := ctrl.Log.WithName("nodeconfig").WithName("init")

pkg/nodeconfig/nodeconfig.go

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ const (
6666
MccName = "machine-config-controller"
6767
)
6868

69-
// nodeConfig holds the information to make the given VM a kubernetes node. As of now, it holds the information
69+
// NodeConfig holds the information to make the given VM a kubernetes node. As of now, it holds the information
7070
// related to kubeclient and the windowsVM.
71-
type nodeConfig struct {
71+
type NodeConfig struct {
7272
client client.Client
7373
// k8sclientset holds the information related to kubernetes clientset
7474
k8sclientset *kubernetes.Clientset
@@ -113,11 +113,11 @@ func (ow OutWriter) Write(p []byte) (n int, err error) {
113113
return len(p), nil
114114
}
115115

116-
// NewNodeConfig creates a new instance of nodeConfig to be used by the caller.
116+
// NewNodeConfig creates a new instance of NodeConfig to be used by the caller.
117117
// hostName having a value will result in the VM's hostname being changed to the given value.
118118
func NewNodeConfig(c client.Client, clientset *kubernetes.Clientset, clusterServiceCIDR, wmcoNamespace string,
119119
instanceInfo *instance.Info, signer ssh.Signer, additionalLabels,
120-
additionalAnnotations map[string]string, platformType configv1.PlatformType) (*nodeConfig, error) {
120+
additionalAnnotations map[string]string, platformType configv1.PlatformType) (*NodeConfig, error) {
121121

122122
if err := cluster.ValidateCIDR(clusterServiceCIDR); err != nil {
123123
return nil, fmt.Errorf("error receiving valid CIDR value for "+
@@ -135,14 +135,14 @@ func NewNodeConfig(c client.Client, clientset *kubernetes.Clientset, clusterServ
135135
return nil, fmt.Errorf("error instantiating Windows instance from VM: %w", err)
136136
}
137137

138-
return &nodeConfig{client: c, k8sclientset: clientset, Windows: win, node: instanceInfo.Node,
138+
return &NodeConfig{client: c, k8sclientset: clientset, Windows: win, node: instanceInfo.Node,
139139
platformType: platformType, wmcoNamespace: wmcoNamespace, clusterServiceCIDR: clusterServiceCIDR,
140140
publicKeyHash: CreatePubKeyHashAnnotation(signer.PublicKey()), log: log, additionalLabels: additionalLabels,
141141
additionalAnnotations: additionalAnnotations}, nil
142142
}
143143

144144
// Configure configures the Windows VM to make it a Windows worker node
145-
func (nc *nodeConfig) Configure(ctx context.Context) error {
145+
func (nc *NodeConfig) Configure(ctx context.Context) error {
146146
drainHelper := nc.newDrainHelper(ctx)
147147
// If a Node object exists already, it implies that we are reconfiguring and we should cordon the node
148148
if nc.node != nil {
@@ -178,7 +178,7 @@ func (nc *nodeConfig) Configure(ctx context.Context) error {
178178
// Perform rest of the configuration with the kubelet running
179179
err = func() error {
180180
if nc.node == nil {
181-
// populate node object in nodeConfig in the case of a new Windows instance
181+
// populate node object in NodeConfig in the case of a new Windows instance
182182
if err := nc.setNode(ctx, false); err != nil {
183183
return fmt.Errorf("error setting node object: %w", err)
184184
}
@@ -215,7 +215,7 @@ func (nc *nodeConfig) Configure(ctx context.Context) error {
215215
nc.node.GetName(), err)
216216
}
217217

218-
// Now that the node has been fully configured, update the node object in nodeConfig once more
218+
// Now that the node has been fully configured, update the node object in NodeConfig once more
219219
if err := nc.setNode(ctx, false); err != nil {
220220
return fmt.Errorf("error getting node object: %w", err)
221221
}
@@ -246,7 +246,7 @@ func (nc *nodeConfig) Configure(ctx context.Context) error {
246246

247247
// safeReboot safely restarts the underlying instance, first cordoning and draining the associated node.
248248
// Waits for reboot to take effect before uncordoning the node.
249-
func (nc *nodeConfig) SafeReboot(ctx context.Context) error {
249+
func (nc *NodeConfig) SafeReboot(ctx context.Context) error {
250250
if nc.node == nil {
251251
return fmt.Errorf("safe reboot of the instance requires an associated node")
252252
}
@@ -275,7 +275,7 @@ func (nc *nodeConfig) SafeReboot(ctx context.Context) error {
275275

276276
// getWICDServiceAccountSecret returns the secret which holds the credentials for the WICD ServiceAccount, creating one
277277
// if necessary
278-
func (nc *nodeConfig) getWICDServiceAccountSecret(ctx context.Context) (*core.Secret, error) {
278+
func (nc *NodeConfig) getWICDServiceAccountSecret(ctx context.Context) (*core.Secret, error) {
279279
var tokenSecret core.Secret
280280
err := nc.client.Get(ctx,
281281
types.NamespacedName{Namespace: nc.wmcoNamespace, Name: windows.WicdServiceName}, &tokenSecret)
@@ -298,7 +298,7 @@ func (nc *nodeConfig) getWICDServiceAccountSecret(ctx context.Context) (*core.Se
298298

299299
// createWICDServiceAccountTokenSecret creates a secret with a long-lived API token for the WICD ServiceAccount and
300300
// waits for the secret data to be populated
301-
func (nc *nodeConfig) createWICDServiceAccountTokenSecret(ctx context.Context) (*core.Secret, error) {
301+
func (nc *NodeConfig) createWICDServiceAccountTokenSecret(ctx context.Context) (*core.Secret, error) {
302302
err := nc.client.Create(ctx, secrets.GenerateServiceAccountTokenSecret(nc.wmcoNamespace, windows.WicdServiceName))
303303
if err != nil {
304304
return nil, fmt.Errorf("error creating secret for WICD ServiceAccount: %w", err)
@@ -326,7 +326,7 @@ func (nc *nodeConfig) createWICDServiceAccountTokenSecret(ctx context.Context) (
326326
}
327327

328328
// createBootstrapFiles creates all prerequisite files on the node required to start kubelet using latest ignition spec
329-
func (nc *nodeConfig) createBootstrapFiles(ctx context.Context) error {
329+
func (nc *NodeConfig) createBootstrapFiles(ctx context.Context) error {
330330
filePathsToContents := make(map[string]string)
331331
filePathsToContents, err := nc.createFilesFromIgnition(ctx)
332332
if err != nil {
@@ -344,7 +344,7 @@ func (nc *nodeConfig) createBootstrapFiles(ctx context.Context) error {
344344
}
345345

346346
// write outputs the data to the path on the underlying Windows instance for each given pair. Creates files if needed.
347-
func (nc *nodeConfig) write(pathToData map[string]string) error {
347+
func (nc *NodeConfig) write(pathToData map[string]string) error {
348348
for path, data := range pathToData {
349349
dir, fileName := windows.SplitPath(path)
350350
if err := nc.Windows.EnsureFileContent([]byte(data), fileName, dir); err != nil {
@@ -355,7 +355,7 @@ func (nc *nodeConfig) write(pathToData map[string]string) error {
355355
}
356356

357357
// createRegistryConfigFiles creates all files on the node required for containerd to mirror images
358-
func (nc *nodeConfig) createRegistryConfigFiles(ctx context.Context) error {
358+
func (nc *NodeConfig) createRegistryConfigFiles(ctx context.Context) error {
359359
configFiles, err := registries.GenerateConfigFiles(ctx, nc.client)
360360
if err != nil {
361361
return err
@@ -365,7 +365,7 @@ func (nc *nodeConfig) createRegistryConfigFiles(ctx context.Context) error {
365365

366366
// createFilesFromIgnition returns the contents and write locations on the instance for any file it can create from
367367
// ignition spec: kubelet CA cert, cloud-config file
368-
func (nc *nodeConfig) createFilesFromIgnition(ctx context.Context) (map[string]string, error) {
368+
func (nc *NodeConfig) createFilesFromIgnition(ctx context.Context) (map[string]string, error) {
369369
ign, err := ignition.New(ctx, nc.client)
370370
if err != nil {
371371
return nil, err
@@ -392,7 +392,7 @@ func (nc *nodeConfig) createFilesFromIgnition(ctx context.Context) (map[string]s
392392
}
393393

394394
// generateBootstrapKubeconfig returns contents of a kubeconfig for kubelet to initially communicate with the API server
395-
func (nc *nodeConfig) generateBootstrapKubeconfig(ctx context.Context) (string, error) {
395+
func (nc *NodeConfig) generateBootstrapKubeconfig(ctx context.Context) (string, error) {
396396
bootstrapSecret, err := nc.k8sclientset.CoreV1().Secrets(mcoNamespace).Get(ctx, mcoBootstrapSecret,
397397
meta.GetOptions{})
398398
if err != nil {
@@ -402,7 +402,7 @@ func (nc *nodeConfig) generateBootstrapKubeconfig(ctx context.Context) (string,
402402
}
403403

404404
// generateWICDKubeconfig returns the contents of a kubeconfig created from the WICD ServiceAccount
405-
func (nc *nodeConfig) generateWICDKubeconfig(ctx context.Context) (string, error) {
405+
func (nc *NodeConfig) generateWICDKubeconfig(ctx context.Context) (string, error) {
406406
wicdSASecret, err := nc.getWICDServiceAccountSecret(ctx)
407407
if err != nil {
408408
return "", err
@@ -447,9 +447,9 @@ func createKubeletConf(clusterServiceCIDR string) (string, error) {
447447
}
448448

449449
// setNode finds the Node associated with the VM that has been configured, and sets the node field of the
450-
// nodeConfig object. If quickCheck is set, the function does a quicker check for the node which is useful in the node
450+
// NodeConfig object. If quickCheck is set, the function does a quicker check for the node which is useful in the node
451451
// reconfiguration case.
452-
func (nc *nodeConfig) setNode(ctx context.Context, quickCheck bool) error {
452+
func (nc *NodeConfig) setNode(ctx context.Context, quickCheck bool) error {
453453
retryInterval := retry.Interval
454454
retryTimeout := retry.Timeout
455455
if quickCheck {
@@ -482,7 +482,7 @@ func (nc *nodeConfig) setNode(ctx context.Context, quickCheck bool) error {
482482
}
483483

484484
// newDrainHelper returns new drain.Helper instance
485-
func (nc *nodeConfig) newDrainHelper(ctx context.Context) *drain.Helper {
485+
func (nc *NodeConfig) newDrainHelper(ctx context.Context) *drain.Helper {
486486
return &drain.Helper{
487487
Ctx: ctx,
488488
Client: nc.k8sclientset,
@@ -498,7 +498,7 @@ func (nc *nodeConfig) newDrainHelper(ctx context.Context) *drain.Helper {
498498
}
499499

500500
// Deconfigure removes the node from the cluster, reverting changes made by the Configure function
501-
func (nc *nodeConfig) Deconfigure(ctx context.Context) error {
501+
func (nc *NodeConfig) Deconfigure(ctx context.Context) error {
502502
if nc.node == nil {
503503
return fmt.Errorf("instance does not a have an associated node to deconfigure")
504504
}
@@ -525,7 +525,7 @@ func (nc *nodeConfig) Deconfigure(ctx context.Context) error {
525525
}
526526

527527
// cleanupWithWICD runs WICD cleanup and waits until the cleanup effects are fully complete
528-
func (nc *nodeConfig) cleanupWithWICD(ctx context.Context) error {
528+
func (nc *NodeConfig) cleanupWithWICD(ctx context.Context) error {
529529
wicdKC, err := nc.generateWICDKubeconfig(ctx)
530530
if err != nil {
531531
return err
@@ -540,7 +540,7 @@ func (nc *nodeConfig) cleanupWithWICD(ctx context.Context) error {
540540
// UpdateKubeletClientCA updates the kubelet client CA certificate file in the Windows node. No service restart or
541541
// reboot required, kubelet detects the changes in the file system and use the new CA certificate. The file is replaced
542542
// if and only if it does not exist or there is a checksum mismatch.
543-
func (nc *nodeConfig) UpdateKubeletClientCA(contents []byte) error {
543+
func (nc *NodeConfig) UpdateKubeletClientCA(contents []byte) error {
544544
// check CA bundle contents
545545
if len(contents) == 0 {
546546
// nothing do to, return
@@ -555,7 +555,7 @@ func (nc *nodeConfig) UpdateKubeletClientCA(contents []byte) error {
555555

556556
// SyncTrustedCABundle builds the trusted CA ConfigMap from image registry certificates and the proxy trust bundle
557557
// and ensures the cert bundle on the instance has up-to-date data
558-
func (nc *nodeConfig) SyncTrustedCABundle(ctx context.Context) error {
558+
func (nc *NodeConfig) SyncTrustedCABundle(ctx context.Context) error {
559559
caBundle := ""
560560
var cc mcfg.ControllerConfig
561561
if err := nc.client.Get(ctx, types.NamespacedName{Namespace: nc.wmcoNamespace,
@@ -580,13 +580,13 @@ func (nc *nodeConfig) SyncTrustedCABundle(ctx context.Context) error {
580580
}
581581

582582
// UpdateTrustedCABundleFile updates the file containing the trusted CA bundle in the Windows node, if needed
583-
func (nc *nodeConfig) UpdateTrustedCABundleFile(data string) error {
583+
func (nc *NodeConfig) UpdateTrustedCABundleFile(data string) error {
584584
dir, fileName := windows.SplitPath(windows.TrustedCABundlePath)
585585
return nc.Windows.EnsureFileContent([]byte(data), fileName, dir)
586586
}
587587

588588
// createTLSCerts creates cert files containing the TLS cert and the key on the Windows node
589-
func (nc *nodeConfig) createTLSCerts(ctx context.Context) error {
589+
func (nc *NodeConfig) createTLSCerts(ctx context.Context) error {
590590
tlsSecret := &core.Secret{}
591591
if err := nc.client.Get(ctx, types.NamespacedName{Name: secrets.TLSSecret,
592592
Namespace: nc.wmcoNamespace}, tlsSecret); err != nil {
@@ -602,6 +602,10 @@ func (nc *nodeConfig) createTLSCerts(ctx context.Context) error {
602602
return nc.Windows.ReplaceDir(certFiles, windows.TLSCertsPath)
603603
}
604604

605+
func (nc *NodeConfig) Close() error {
606+
return nc.Windows.Close()
607+
}
608+
605609
// generateKubeconfig creates a kubeconfig spec with the certificate and token data from the given secret
606610
func generateKubeconfig(caCert []byte, token, apiServerURL, username string) clientcmdv1.Config {
607611
kubeconfig := clientcmdv1.Config{

pkg/windows/connectivity.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"strings"
9+
"syscall"
910
"time"
1011

1112
"github.com/go-logr/logr"
@@ -44,6 +45,8 @@ type connectivity interface {
4445
transfer(*sftp.Client, io.Reader, string, string) error
4546
// transferFiles transfers the given files to a given remote directory
4647
transferFiles(*sftp.Client, map[string][]byte, string) error
48+
// close closes all network connections
49+
close() error
4750
}
4851

4952
// sshConnectivity encapsulates the information needed to connect to the Windows VM over ssh
@@ -189,3 +192,12 @@ func (c *sshConnectivity) transferFiles(sftpClient *sftp.Client, files map[strin
189192
}
190193
return nil
191194
}
195+
196+
func (c *sshConnectivity) close() error {
197+
err := c.sshClient.Close()
198+
// If the error is due to the underlying connection being already closed, don't return an error
199+
if errors.Is(syscall.EINVAL, err) {
200+
return nil
201+
}
202+
return err
203+
}

pkg/windows/windows.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ type Windows interface {
273273
// RunWICDCleanup ensures the WICD service is stopped and runs the cleanup command that ensures all WICD-managed
274274
// services are also stopped
275275
RunWICDCleanup(string, string) error
276+
Close() error
276277
}
277278

278279
// windows implements the Windows interface
@@ -293,6 +294,10 @@ type windows struct {
293294
filesToTransfer map[*payload.FileInfo]string
294295
}
295296

297+
func (vm *windows) Close() error {
298+
return vm.interact.close()
299+
}
300+
296301
// New returns a new Windows instance constructed from the given WindowsVM
297302
func New(clusterDNS string, instanceInfo *instance.Info, signer ssh.Signer, platform *config.PlatformType) (Windows, error) {
298303
log := ctrl.Log.WithName(fmt.Sprintf("wc %s", instanceInfo.Address))

0 commit comments

Comments
 (0)