Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/controller/cabundleinjector/admissionwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cabundleinjector
import (
"bytes"
"context"
"errors"

admissionregv1 "k8s.io/api/admissionregistration/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -90,10 +91,15 @@ type webhookCABundleInjector[T admissionregv1.MutatingWebhookConfiguration | adm
client webhookConfigUpdater[T]
lister cachedWebhookConfigGetter[T]
newWebhookConfigAccessor func(*T) webhookConfigAccessor[T]
caBundle []byte
caBundle *bundleCache
}

func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
caBundle := bi.caBundle.Load()
if caBundle == nil {
return errors.New("CA bundle is not available")
}

webhookConfig, err := bi.lister.Get(syncCtx.QueueKey())
if apierrors.IsNotFound(err) {
return nil
Expand All @@ -105,7 +111,7 @@ func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.
webhooksNeedingUpdate := []int{}
for i := 0; i < webhookConfigAccessor.WebhooksLen(); i++ {
webhookClientConfig := webhookConfigAccessor.GetWebhookClientCA(i)
if !bytes.Equal(webhookClientConfig.CABundle, bi.caBundle) {
if !bytes.Equal(webhookClientConfig.CABundle, caBundle) {
webhooksNeedingUpdate = append(webhooksNeedingUpdate, i)
}
}
Expand All @@ -118,7 +124,7 @@ func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.
// make a copy to avoid mutating cache state
webhookConfigCopy := webhookConfigAccessor.DeepCopy()
for _, i := range webhooksNeedingUpdate {
webhookConfigCopy.GetWebhookClientCA(i).CABundle = bi.caBundle
webhookConfigCopy.GetWebhookClientCA(i).CABundle = caBundle
}
_, err = bi.client.Update(ctx, webhookConfigCopy.GetObject(), metav1.UpdateOptions{})
return err
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/cabundleinjector/admissionwebhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,15 @@ func TestWebhookCABundleInjectorSync(t *testing.T) {
waitSuccess := cache.WaitForCacheSync(testCtx.Done(), webhookInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced)
require.True(t, waitSuccess)

cache := &bundleCache{}
cache.Store(testCABundle)

injector := webhookCABundleInjector[admissionregv1.ValidatingWebhookConfiguration]{
webhookConfigType: "testwebhook",
newWebhookConfigAccessor: newValidatingWebhookAccessor,
client: webhookClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
lister: webhookInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Lister(),
caBundle: testCABundle,
caBundle: cache,
}

if gotErr := injector.Sync(testCtx, testContext{"test-webhook"}); (gotErr != nil) != tt.wantErr {
Expand Down
12 changes: 9 additions & 3 deletions pkg/controller/cabundleinjector/apiservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cabundleinjector
import (
"bytes"
"context"
"errors"

apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -19,7 +20,7 @@ import (
type apiServiceCABundleInjector struct {
client apiserviceclientv1.APIServiceInterface
lister apiservicelister.APIServiceLister
caBundle []byte
caBundle *bundleCache
}

func newAPIServiceInjectorConfig(config *caBundleInjectorConfig) controllerConfig {
Expand Down Expand Up @@ -48,22 +49,27 @@ func newAPIServiceInjectorConfig(config *caBundleInjectorConfig) controllerConfi
}

func (bi *apiServiceCABundleInjector) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
caBundle := bi.caBundle.Load()
if caBundle == nil {
return errors.New("CA bundle is not available")
}

apiService, err := bi.lister.Get(syncCtx.QueueKey())
if apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}

if bytes.Equal(apiService.Spec.CABundle, bi.caBundle) {
if bytes.Equal(apiService.Spec.CABundle, caBundle) {
return nil
}

klog.Infof("updating apiservice %s with the service signing CA bundle", apiService.Name)

// avoid mutating our cache
apiServiceCopy := apiService.DeepCopy()
apiServiceCopy.Spec.CABundle = bi.caBundle
apiServiceCopy.Spec.CABundle = caBundle
_, err = bi.client.Update(ctx, apiServiceCopy, v1.UpdateOptions{})
return err
}
177 changes: 177 additions & 0 deletions pkg/controller/cabundleinjector/bundles_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package cabundleinjector

import (
"context"
"fmt"
"os"
"sync/atomic"
"time"

"k8s.io/klog/v2"

"github.com/openshift/library-go/pkg/controller/fileobserver"
)

type bundleCache struct {
v atomic.Pointer[[]byte]
}

func (c *bundleCache) Load() []byte {
if dataPtr := c.v.Load(); dataPtr != nil {
return *dataPtr
}
return nil
}

func (c *bundleCache) Store(data []byte) {
if data == nil {
c.v.Store(nil)
} else {
c.v.Store(&data)
}
}

type bundlesWatcher struct {
// caBundle contains the signing CA bundle.
CABundle *bundleCache
// caBundleLegacy is constructed so that it matches what the old KCM used to do.
// The signing CA bundle is appended to the service account ca.crt.
CABundleLegacy *bundleCache

// OnReadFailed can be set for custom read error handling.
// When the hook returns the original error, it is logged.
// Logging happens automatically when no hook is set.
OnReadFailed func(context.Context, error) error

caBundlePath string
saTokenCABundlePath string
pollingInterval time.Duration
resyncInterval time.Duration
maxReadAttempts int

// readFile is used to read files from disk.
// Used for mocking during tests mostly.
readFile func(string) ([]byte, error)
}

func newBundlesWatcher(
caBundlePath string, saTokenCABundlePath string,
pollingInterval time.Duration,
resyncInterval time.Duration,
maxReadAttempts int,
) *bundlesWatcher {
return &bundlesWatcher{
CABundle: &bundleCache{},
CABundleLegacy: &bundleCache{},
caBundlePath: caBundlePath,
saTokenCABundlePath: saTokenCABundlePath,
pollingInterval: pollingInterval,
resyncInterval: resyncInterval,
maxReadAttempts: maxReadAttempts,
readFile: os.ReadFile,
}
}

func (w *bundlesWatcher) Start(ctx context.Context) error {
// Read the files initially. This is how the previous implementation worked, so we retain that.
caBundleData, saTokenCAData, legacyCABundleData, err := w.readBundles()
if err != nil {
return fmt.Errorf("failed to read bundles on startup: %w", err)
}

w.CABundle.Store(caBundleData)
w.CABundleLegacy.Store(legacyCABundleData)

// Start watching.
observer, err := fileobserver.NewObserver(w.pollingInterval)
if err != nil {
return fmt.Errorf("failed to start CA bundles observer: %w", err)
}

observer.AddReactor(func(_ string, _ fileobserver.ActionType) error {
return w.reloadBundles(ctx)
}, map[string][]byte{
w.caBundlePath: caBundleData,
w.saTokenCABundlePath: saTokenCAData,
}, w.caBundlePath, w.saTokenCABundlePath)

go observer.Run(ctx.Done())

// Periodically re-read the bundles as a safety net
// in case the file observer reactor callback fails.
if w.resyncInterval > 0 {
go func() {
ticker := time.NewTicker(w.resyncInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := w.reloadBundles(ctx); err != nil {
klog.Errorf("Failed to resync CA bundles: %v", err)
}
}
}
}()
}

return nil
}

func (w *bundlesWatcher) reloadBundles(ctx context.Context) error {
caBundleData, _, legacyCABundleData, err := w.readBundles()
if err != nil {
// On read errors, keep existing cached content
// until the next successful read.
if w.OnReadFailed != nil {
err = w.OnReadFailed(ctx, err)
}
return err
}

w.CABundle.Store(caBundleData)
w.CABundleLegacy.Store(legacyCABundleData)
return nil
}

func (w *bundlesWatcher) readBundles() ([]byte, []byte, []byte, error) {
caBundleContent, err := w.readFileWithRetries(w.caBundlePath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil, nil, nil
}
return nil, nil, nil, fmt.Errorf("failed to read %q: %w", w.caBundlePath, err)
}

// This construction matches what the old KCM used to do.
// It added the entire ca.crt to the service account ca.crt.
vulnerableLegacyCABundleContent := append([]byte{}, caBundleContent...)
saTokenCABundleContent, err := w.readFileWithRetries(w.saTokenCABundlePath)
if err != nil && !os.IsNotExist(err) {
return nil, nil, nil, fmt.Errorf("failed to read %q: %w", w.saTokenCABundlePath, err)
}
if len(saTokenCABundleContent) > 0 {
vulnerableLegacyCABundleContent = append(vulnerableLegacyCABundleContent, saTokenCABundleContent...)
vulnerableLegacyCABundleContent = append(vulnerableLegacyCABundleContent, []byte("\n")...)
}

return caBundleContent, saTokenCABundleContent, vulnerableLegacyCABundleContent, nil
}

func (w *bundlesWatcher) readFileWithRetries(path string) ([]byte, error) {
numAttempts := w.maxReadAttempts
if numAttempts <= 0 {
numAttempts = 1
}

var lastErr error
for range numAttempts {
content, err := w.readFile(path)
if err == nil {
return content, nil
}
lastErr = err
}
return nil, lastErr
}
Loading