Skip to content

Commit 245e305

Browse files
committed
test/e2e/in-memory: improve performance by fixing locking issues
Signed-off-by: Stefan Büringer [email protected]
1 parent 3c9e154 commit 245e305

File tree

2 files changed

+98
-64
lines changed

2 files changed

+98
-64
lines changed

test/infrastructure/inmemory/internal/server/certs.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,19 @@ import (
2626
"sigs.k8s.io/cluster-api/util/certs"
2727
)
2828

29-
// newCertAndKey builds a new cert and key signed with the given CA and with the given config.
30-
func newCertAndKey(caCert *x509.Certificate, caKey *rsa.PrivateKey, config *certs.Config) (*x509.Certificate, *rsa.PrivateKey, error) {
31-
key, err := certs.NewPrivateKey()
29+
var key *rsa.PrivateKey
30+
31+
func init() {
32+
// Create a private key only once, since this is a slow operation and it is ok
33+
// to reuse it for all the certificates in a test provider.
34+
var err error
35+
key, err = certs.NewPrivateKey()
3236
if err != nil {
33-
return nil, nil, errors.Wrap(err, "unable to create private key")
37+
panic(errors.Wrap(err, "unable to create private key").Error())
3438
}
39+
}
3540

41+
func newCertAndKey(caCert *x509.Certificate, caKey *rsa.PrivateKey, config *certs.Config) (*x509.Certificate, *rsa.PrivateKey, error) {
3642
cert, err := config.NewSignedCert(key, caCert, caKey)
3743
if err != nil {
3844
return nil, nil, errors.Wrap(err, "unable to create certificate")

test/infrastructure/inmemory/internal/server/mux.go

Lines changed: 88 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import (
3232
"github.com/pkg/errors"
3333
"golang.org/x/net/http2"
3434
"golang.org/x/net/http2/h2c"
35+
kerrors "k8s.io/apimachinery/pkg/util/errors"
3536
"k8s.io/apimachinery/pkg/util/sets"
37+
"k8s.io/apimachinery/pkg/util/wait"
3638
"k8s.io/klog/v2"
3739
"sigs.k8s.io/controller-runtime/pkg/log"
3840

@@ -316,81 +318,107 @@ func (m *WorkloadClustersMux) initWorkloadClusterListenerWithPortLocked(wclName
316318
// When the first API server instance is added the serving certificates and the admin certificate
317319
// for tests are generated, and the listener is started.
318320
func (m *WorkloadClustersMux) AddAPIServer(wclName, podName string, caCert *x509.Certificate, caKey *rsa.PrivateKey) error {
319-
m.lock.Lock()
320-
defer m.lock.Unlock()
321-
322-
wcl, ok := m.workloadClusterListeners[wclName]
323-
if !ok {
324-
return errors.Errorf("workloadClusterListener with name %s must be initialized before adding an APIserver", wclName)
325-
}
326-
wcl.apiServers.Insert(podName)
327-
m.log.Info("APIServer instance added to workloadClusterListener", "listenerName", wclName, "address", wcl.Address(), "podName", podName)
328-
329-
// TODO: check if cert/key are already set, they should match
330-
wcl.apiServerCaCertificate = caCert
331-
wcl.apiServerCaKey = caKey
321+
// Start server
322+
// Note: It is important that we unlock once the server is started. Because otherwise the server
323+
// doesn't work yet as GetCertificate (which is required for the tls handshake) also requires the lock.
324+
var startServerErr error
325+
var wcl *WorkloadClusterListener
326+
err := func() error {
327+
m.lock.Lock()
328+
defer m.lock.Unlock()
329+
330+
var ok bool
331+
wcl, ok = m.workloadClusterListeners[wclName]
332+
if !ok {
333+
return errors.Errorf("workloadClusterListener with name %s must be initialized before adding an APIserver", wclName)
334+
}
335+
wcl.apiServers.Insert(podName)
336+
m.log.Info("APIServer instance added to workloadClusterListener", "listenerName", wclName, "address", wcl.Address(), "podName", podName)
337+
338+
// TODO: check if cert/key are already set, they should match
339+
wcl.apiServerCaCertificate = caCert
340+
wcl.apiServerCaKey = caKey
341+
342+
// Generate Serving certificates for the API server instance
343+
// NOTE: There is only one server certificate for all API server instances (kubeadm
344+
// instead creates one for each API server pod). We don't need this because we are
345+
// accessing all API servers via the same endpoint.
346+
if wcl.apiServerServingCertificate == nil {
347+
config := apiServerCertificateConfig(wcl.host)
348+
cert, key, err := newCertAndKey(caCert, caKey, config)
349+
if err != nil {
350+
return errors.Wrapf(err, "failed to create serving certificate for API server %s", podName)
351+
}
352+
353+
certificate, err := tls.X509KeyPair(certs.EncodeCertPEM(cert), certs.EncodePrivateKeyPEM(key))
354+
if err != nil {
355+
return errors.Wrapf(err, "failed to create X509KeyPair for API server %s", podName)
356+
}
357+
wcl.apiServerServingCertificate = &certificate
358+
}
332359

333-
// Generate Serving certificates for the API server instance
334-
// NOTE: There is only one server certificate for all API server instances (kubeadm
335-
// instead creates one for each API server pod). We don't need this because we are
336-
// accessing all API servers via the same endpoint.
337-
if wcl.apiServerServingCertificate == nil {
338-
config := apiServerCertificateConfig(wcl.host)
339-
cert, key, err := newCertAndKey(caCert, caKey, config)
340-
if err != nil {
341-
return errors.Wrapf(err, "failed to create serving certificate for API server %s", podName)
360+
// Generate admin certificates to be used for accessing the API server.
361+
// NOTE: this is used for tests because CAPI creates its own.
362+
if wcl.adminCertificate == nil {
363+
config := adminClientCertificateConfig()
364+
cert, key, err := newCertAndKey(caCert, caKey, config)
365+
if err != nil {
366+
return errors.Wrapf(err, "failed to create admin certificate for API server %s", podName)
367+
}
368+
369+
wcl.adminCertificate = cert
370+
wcl.adminKey = key
342371
}
343372

344-
certificate, err := tls.X509KeyPair(certs.EncodeCertPEM(cert), certs.EncodePrivateKeyPEM(key))
345-
if err != nil {
346-
return errors.Wrapf(err, "failed to create X509KeyPair for API server %s", podName)
373+
// Start the listener for the API server.
374+
// NOTE: There is only one listener for all API server instances; the same listener will act
375+
// as a port forward target too.
376+
if wcl.listener != nil {
377+
return nil
347378
}
348-
wcl.apiServerServingCertificate = &certificate
349-
}
350379

351-
// Generate admin certificates to be used for accessing the API server.
352-
// NOTE: this is used for tests because CAPI creates its own.
353-
if wcl.adminCertificate == nil {
354-
config := adminClientCertificateConfig()
355-
cert, key, err := newCertAndKey(caCert, caKey, config)
380+
l, err := net.Listen("tcp", wcl.HostPort())
356381
if err != nil {
357-
return errors.Wrapf(err, "failed to create admin certificate for API server %s", podName)
382+
return errors.Wrapf(err, "failed to start WorkloadClusterListener %s, %s", wclName, wcl.HostPort())
358383
}
384+
wcl.listener = l
359385

360-
wcl.adminCertificate = cert
361-
wcl.adminKey = key
362-
}
363-
364-
// Start the listener for the API server.
365-
// NOTE: There is only one listener for all API server instances; the same listener will act
366-
// as a port forward target too.
367-
if wcl.listener != nil {
386+
go func() {
387+
if startServerErr = m.muxServer.ServeTLS(wcl.listener, "", ""); startServerErr != nil && !errors.Is(startServerErr, http.ErrServerClosed) {
388+
m.log.Error(startServerErr, "Failed to start WorkloadClusterListener", "listenerName", wclName, "address", wcl.Address())
389+
}
390+
}()
368391
return nil
369-
}
370-
371-
l, err := net.Listen("tcp", wcl.HostPort())
392+
}()
372393
if err != nil {
373-
return errors.Wrapf(err, "failed to start WorkloadClusterListener %s, %s", wclName, wcl.HostPort())
394+
return errors.Wrapf(err, "error starting server")
374395
}
375-
wcl.listener = l
376396

377-
var startErr error
378-
startCh := make(chan struct{})
379-
go func() {
380-
startCh <- struct{}{}
381-
if err := m.muxServer.ServeTLS(wcl.listener, "", ""); err != nil && !errors.Is(err, http.ErrServerClosed) {
382-
startErr = err
383-
m.log.Error(startErr, "Failed to start WorkloadClusterListener", "listenerName", wclName, "address", wcl.Address())
397+
// Wait until the sever is working.
398+
var pollErr error
399+
err = wait.PollUntilContextTimeout(context.TODO(), 10*time.Millisecond, 1*time.Second, true, func(ctx context.Context) (done bool, err error) {
400+
d := &net.Dialer{Timeout: 50 * time.Millisecond}
401+
conn, err := tls.DialWithDialer(d, "tcp", wcl.HostPort(), &tls.Config{
402+
InsecureSkipVerify: true, //nolint:gosec // config is used to connect to our own port.
403+
})
404+
if err != nil {
405+
pollErr = fmt.Errorf("server is not reachable: %w", err)
406+
return false, nil
407+
}
408+
409+
if err := conn.Close(); err != nil {
410+
pollErr = fmt.Errorf("server is not reachable: closing connection: %w", err)
411+
return false, nil
384412
}
385-
}()
386413

387-
<-startCh
388-
// TODO: Try to make this race condition free e.g. by checking the listener is answering.
389-
// There is no guarantee ServeTLS was called after we received something on the startCh.
390-
time.Sleep(100 * time.Millisecond)
414+
return true, nil
415+
})
416+
if err != nil {
417+
return kerrors.NewAggregate([]error{err, pollErr})
418+
}
391419

392-
if startErr != nil {
393-
return startErr
420+
if startServerErr != nil {
421+
return startServerErr
394422
}
395423

396424
m.log.Info("WorkloadClusterListener successfully started", "listenerName", wclName, "address", wcl.Address())

0 commit comments

Comments
 (0)