Skip to content

Commit c1e63f9

Browse files
committed
cabundleinjector: Watch bundle files for changes
Add a watcher to watch all relevant CA bundle files for changes so that they are automatically reloaded when modified. There is also a periodic resync implemented and set to 10 minutes. The current implementation does not update all affected ConfigMaps. This means that only the ConfigMaps created after the bundles are reloaded are affected and up-to-date with respect to the files present on the disk.
1 parent 77fa780 commit c1e63f9

File tree

8 files changed

+657
-41
lines changed

8 files changed

+657
-41
lines changed

pkg/controller/cabundleinjector/admissionwebhook.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cabundleinjector
33
import (
44
"bytes"
55
"context"
6+
"errors"
67

78
admissionregv1 "k8s.io/api/admissionregistration/v1"
89
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -90,10 +91,15 @@ type webhookCABundleInjector[T admissionregv1.MutatingWebhookConfiguration | adm
9091
client webhookConfigUpdater[T]
9192
lister cachedWebhookConfigGetter[T]
9293
newWebhookConfigAccessor func(*T) webhookConfigAccessor[T]
93-
caBundle []byte
94+
caBundle *bundleCache
9495
}
9596

9697
func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
98+
caBundle := bi.caBundle.Load()
99+
if caBundle == nil {
100+
return errors.New("CA bundle is not available")
101+
}
102+
97103
webhookConfig, err := bi.lister.Get(syncCtx.QueueKey())
98104
if apierrors.IsNotFound(err) {
99105
return nil
@@ -105,7 +111,7 @@ func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.
105111
webhooksNeedingUpdate := []int{}
106112
for i := 0; i < webhookConfigAccessor.WebhooksLen(); i++ {
107113
webhookClientConfig := webhookConfigAccessor.GetWebhookClientCA(i)
108-
if !bytes.Equal(webhookClientConfig.CABundle, bi.caBundle) {
114+
if !bytes.Equal(webhookClientConfig.CABundle, caBundle) {
109115
webhooksNeedingUpdate = append(webhooksNeedingUpdate, i)
110116
}
111117
}
@@ -118,7 +124,7 @@ func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.
118124
// make a copy to avoid mutating cache state
119125
webhookConfigCopy := webhookConfigAccessor.DeepCopy()
120126
for _, i := range webhooksNeedingUpdate {
121-
webhookConfigCopy.GetWebhookClientCA(i).CABundle = bi.caBundle
127+
webhookConfigCopy.GetWebhookClientCA(i).CABundle = caBundle
122128
}
123129
_, err = bi.client.Update(ctx, webhookConfigCopy.GetObject(), metav1.UpdateOptions{})
124130
return err

pkg/controller/cabundleinjector/admissionwebhook_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,15 @@ func TestWebhookCABundleInjectorSync(t *testing.T) {
9292
waitSuccess := cache.WaitForCacheSync(testCtx.Done(), webhookInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced)
9393
require.True(t, waitSuccess)
9494

95+
cache := &bundleCache{}
96+
cache.Store(testCABundle)
97+
9598
injector := webhookCABundleInjector[admissionregv1.ValidatingWebhookConfiguration]{
9699
webhookConfigType: "testwebhook",
97100
newWebhookConfigAccessor: newValidatingWebhookAccessor,
98101
client: webhookClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
99102
lister: webhookInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Lister(),
100-
caBundle: testCABundle,
103+
caBundle: cache,
101104
}
102105

103106
if gotErr := injector.Sync(testCtx, testContext{"test-webhook"}); (gotErr != nil) != tt.wantErr {

pkg/controller/cabundleinjector/apiservice.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cabundleinjector
33
import (
44
"bytes"
55
"context"
6+
"errors"
67

78
apierrors "k8s.io/apimachinery/pkg/api/errors"
89
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -19,7 +20,7 @@ import (
1920
type apiServiceCABundleInjector struct {
2021
client apiserviceclientv1.APIServiceInterface
2122
lister apiservicelister.APIServiceLister
22-
caBundle []byte
23+
caBundle *bundleCache
2324
}
2425

2526
func newAPIServiceInjectorConfig(config *caBundleInjectorConfig) controllerConfig {
@@ -48,22 +49,27 @@ func newAPIServiceInjectorConfig(config *caBundleInjectorConfig) controllerConfi
4849
}
4950

5051
func (bi *apiServiceCABundleInjector) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
52+
caBundle := bi.caBundle.Load()
53+
if caBundle == nil {
54+
return errors.New("CA bundle is not available")
55+
}
56+
5157
apiService, err := bi.lister.Get(syncCtx.QueueKey())
5258
if apierrors.IsNotFound(err) {
5359
return nil
5460
} else if err != nil {
5561
return err
5662
}
5763

58-
if bytes.Equal(apiService.Spec.CABundle, bi.caBundle) {
64+
if bytes.Equal(apiService.Spec.CABundle, caBundle) {
5965
return nil
6066
}
6167

6268
klog.Infof("updating apiservice %s with the service signing CA bundle", apiService.Name)
6369

6470
// avoid mutating our cache
6571
apiServiceCopy := apiService.DeepCopy()
66-
apiServiceCopy.Spec.CABundle = bi.caBundle
72+
apiServiceCopy.Spec.CABundle = caBundle
6773
_, err = bi.client.Update(ctx, apiServiceCopy, v1.UpdateOptions{})
6874
return err
6975
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package cabundleinjector
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"sync/atomic"
8+
"time"
9+
10+
"k8s.io/klog/v2"
11+
12+
"github.com/openshift/library-go/pkg/controller/fileobserver"
13+
)
14+
15+
type bundleCache struct {
16+
v atomic.Pointer[[]byte]
17+
}
18+
19+
func (c *bundleCache) Load() []byte {
20+
if dataPtr := c.v.Load(); dataPtr != nil {
21+
return *dataPtr
22+
}
23+
return nil
24+
}
25+
26+
func (c *bundleCache) Store(data []byte) {
27+
if data == nil {
28+
c.v.Store(nil)
29+
} else {
30+
c.v.Store(&data)
31+
}
32+
}
33+
34+
type bundlesWatcher struct {
35+
// caBundle contains the signing CA bundle.
36+
CABundle *bundleCache
37+
// caBundleLegacy is constructed so that it matches what the old KCM used to do.
38+
// The signing CA bundle is appended to the service account ca.crt.
39+
CABundleLegacy *bundleCache
40+
41+
// OnReadFailed can be set for custom read error handling.
42+
// When the hook returns the original error, it is logged.
43+
// Logging happens automatically when no hook is set.
44+
OnReadFailed func(context.Context, error) error
45+
46+
caBundlePath string
47+
saTokenCABundlePath string
48+
pollingInterval time.Duration
49+
resyncInterval time.Duration
50+
maxReadAttempts int
51+
52+
// readFile is used to read files from disk.
53+
// Used for mocking during tests mostly.
54+
readFile func(string) ([]byte, error)
55+
}
56+
57+
func newBundlesWatcher(
58+
caBundlePath string, saTokenCABundlePath string,
59+
pollingInterval time.Duration,
60+
resyncInterval time.Duration,
61+
maxReadAttempts int,
62+
) *bundlesWatcher {
63+
return &bundlesWatcher{
64+
CABundle: &bundleCache{},
65+
CABundleLegacy: &bundleCache{},
66+
caBundlePath: caBundlePath,
67+
saTokenCABundlePath: saTokenCABundlePath,
68+
pollingInterval: pollingInterval,
69+
resyncInterval: resyncInterval,
70+
maxReadAttempts: maxReadAttempts,
71+
readFile: os.ReadFile,
72+
}
73+
}
74+
75+
func (w *bundlesWatcher) Start(ctx context.Context) error {
76+
// Read the files initially. This is how the previous implementation worked, so we retain that.
77+
caBundleData, saTokenCAData, legacyCABundleData, err := w.readBundles()
78+
if err != nil {
79+
return fmt.Errorf("failed to read bundles on startup: %w", err)
80+
}
81+
82+
w.CABundle.Store(caBundleData)
83+
w.CABundleLegacy.Store(legacyCABundleData)
84+
85+
// Start watching.
86+
observer, err := fileobserver.NewObserver(w.pollingInterval)
87+
if err != nil {
88+
return fmt.Errorf("failed to start CA bundles observer: %w", err)
89+
}
90+
91+
observer.AddReactor(func(_ string, _ fileobserver.ActionType) error {
92+
return w.reloadBundles(ctx)
93+
}, map[string][]byte{
94+
w.caBundlePath: caBundleData,
95+
w.saTokenCABundlePath: saTokenCAData,
96+
}, w.caBundlePath, w.saTokenCABundlePath)
97+
98+
go observer.Run(ctx.Done())
99+
100+
// Periodically re-read the bundles as a safety net
101+
// in case the file observer reactor callback fails.
102+
if w.resyncInterval > 0 {
103+
go func() {
104+
ticker := time.NewTicker(w.resyncInterval)
105+
defer ticker.Stop()
106+
for {
107+
select {
108+
case <-ctx.Done():
109+
return
110+
case <-ticker.C:
111+
if err := w.reloadBundles(ctx); err != nil {
112+
klog.Errorf("Failed to resync CA bundles: %v", err)
113+
}
114+
}
115+
}
116+
}()
117+
}
118+
119+
return nil
120+
}
121+
122+
func (w *bundlesWatcher) reloadBundles(ctx context.Context) error {
123+
caBundleData, _, legacyCABundleData, err := w.readBundles()
124+
if err != nil {
125+
// On read errors, keep existing cached content
126+
// until the next successful read.
127+
if w.OnReadFailed != nil {
128+
err = w.OnReadFailed(ctx, err)
129+
}
130+
return err
131+
}
132+
133+
w.CABundle.Store(caBundleData)
134+
w.CABundleLegacy.Store(legacyCABundleData)
135+
return nil
136+
}
137+
138+
func (w *bundlesWatcher) readBundles() ([]byte, []byte, []byte, error) {
139+
caBundleContent, err := w.readFileWithRetries(w.caBundlePath)
140+
if err != nil {
141+
if os.IsNotExist(err) {
142+
return nil, nil, nil, nil
143+
}
144+
return nil, nil, nil, fmt.Errorf("failed to read %q: %w", w.caBundlePath, err)
145+
}
146+
147+
// This construction matches what the old KCM used to do.
148+
// It added the entire ca.crt to the service account ca.crt.
149+
vulnerableLegacyCABundleContent := append([]byte{}, caBundleContent...)
150+
saTokenCABundleContent, err := w.readFileWithRetries(w.saTokenCABundlePath)
151+
if err != nil && !os.IsNotExist(err) {
152+
return nil, nil, nil, fmt.Errorf("failed to read %q: %w", w.saTokenCABundlePath, err)
153+
}
154+
if len(saTokenCABundleContent) > 0 {
155+
vulnerableLegacyCABundleContent = append(vulnerableLegacyCABundleContent, saTokenCABundleContent...)
156+
vulnerableLegacyCABundleContent = append(vulnerableLegacyCABundleContent, []byte("\n")...)
157+
}
158+
159+
return caBundleContent, saTokenCABundleContent, vulnerableLegacyCABundleContent, nil
160+
}
161+
162+
func (w *bundlesWatcher) readFileWithRetries(path string) ([]byte, error) {
163+
numAttempts := w.maxReadAttempts
164+
if numAttempts <= 0 {
165+
numAttempts = 1
166+
}
167+
168+
var lastErr error
169+
for range numAttempts {
170+
content, err := w.readFile(path)
171+
if err == nil {
172+
return content, nil
173+
}
174+
lastErr = err
175+
}
176+
return nil, lastErr
177+
}

0 commit comments

Comments
 (0)