Skip to content

Commit 778254d

Browse files
committed
cabundleinjector: Watch bundle files for changes
Add a watcher to watch all relevant files for changes so that they are automatically reloaded when modified.
1 parent 77fa780 commit 778254d

File tree

8 files changed

+657
-42
lines changed

8 files changed

+657
-42
lines changed

pkg/controller/cabundleinjector/admissionwebhook.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cabundleinjector
33
import (
44
"bytes"
55
"context"
6+
"errors"
67

78
admissionregv1 "k8s.io/api/admissionregistration/v1"
89
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -90,10 +91,15 @@ type webhookCABundleInjector[T admissionregv1.MutatingWebhookConfiguration | adm
9091
client webhookConfigUpdater[T]
9192
lister cachedWebhookConfigGetter[T]
9293
newWebhookConfigAccessor func(*T) webhookConfigAccessor[T]
93-
caBundle []byte
94+
caBundle *bundleCache
9495
}
9596

9697
func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
98+
caBundle := bi.caBundle.Load()
99+
if caBundle == nil {
100+
return errors.New("CA bundle is not available")
101+
}
102+
97103
webhookConfig, err := bi.lister.Get(syncCtx.QueueKey())
98104
if apierrors.IsNotFound(err) {
99105
return nil
@@ -105,7 +111,7 @@ func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.
105111
webhooksNeedingUpdate := []int{}
106112
for i := 0; i < webhookConfigAccessor.WebhooksLen(); i++ {
107113
webhookClientConfig := webhookConfigAccessor.GetWebhookClientCA(i)
108-
if !bytes.Equal(webhookClientConfig.CABundle, bi.caBundle) {
114+
if !bytes.Equal(webhookClientConfig.CABundle, caBundle) {
109115
webhooksNeedingUpdate = append(webhooksNeedingUpdate, i)
110116
}
111117
}
@@ -118,7 +124,7 @@ func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.
118124
// make a copy to avoid mutating cache state
119125
webhookConfigCopy := webhookConfigAccessor.DeepCopy()
120126
for _, i := range webhooksNeedingUpdate {
121-
webhookConfigCopy.GetWebhookClientCA(i).CABundle = bi.caBundle
127+
webhookConfigCopy.GetWebhookClientCA(i).CABundle = caBundle
122128
}
123129
_, err = bi.client.Update(ctx, webhookConfigCopy.GetObject(), metav1.UpdateOptions{})
124130
return err

pkg/controller/cabundleinjector/admissionwebhook_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,15 @@ func TestWebhookCABundleInjectorSync(t *testing.T) {
9292
waitSuccess := cache.WaitForCacheSync(testCtx.Done(), webhookInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced)
9393
require.True(t, waitSuccess)
9494

95+
cache := &bundleCache{}
96+
cache.Store(testCABundle)
97+
9598
injector := webhookCABundleInjector[admissionregv1.ValidatingWebhookConfiguration]{
9699
webhookConfigType: "testwebhook",
97100
newWebhookConfigAccessor: newValidatingWebhookAccessor,
98101
client: webhookClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
99102
lister: webhookInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Lister(),
100-
caBundle: testCABundle,
103+
caBundle: cache,
101104
}
102105

103106
if gotErr := injector.Sync(testCtx, testContext{"test-webhook"}); (gotErr != nil) != tt.wantErr {

pkg/controller/cabundleinjector/apiservice.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package cabundleinjector
33
import (
44
"bytes"
55
"context"
6-
6+
"errors"
77
apierrors "k8s.io/apimachinery/pkg/api/errors"
88
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
99
"k8s.io/klog/v2"
@@ -19,7 +19,7 @@ import (
1919
type apiServiceCABundleInjector struct {
2020
client apiserviceclientv1.APIServiceInterface
2121
lister apiservicelister.APIServiceLister
22-
caBundle []byte
22+
caBundle *bundleCache
2323
}
2424

2525
func newAPIServiceInjectorConfig(config *caBundleInjectorConfig) controllerConfig {
@@ -48,22 +48,27 @@ func newAPIServiceInjectorConfig(config *caBundleInjectorConfig) controllerConfi
4848
}
4949

5050
func (bi *apiServiceCABundleInjector) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
51+
caBundle := bi.caBundle.Load()
52+
if caBundle == nil {
53+
return errors.New("CA bundle is not available")
54+
}
55+
5156
apiService, err := bi.lister.Get(syncCtx.QueueKey())
5257
if apierrors.IsNotFound(err) {
5358
return nil
5459
} else if err != nil {
5560
return err
5661
}
5762

58-
if bytes.Equal(apiService.Spec.CABundle, bi.caBundle) {
63+
if bytes.Equal(apiService.Spec.CABundle, caBundle) {
5964
return nil
6065
}
6166

6267
klog.Infof("updating apiservice %s with the service signing CA bundle", apiService.Name)
6368

6469
// avoid mutating our cache
6570
apiServiceCopy := apiService.DeepCopy()
66-
apiServiceCopy.Spec.CABundle = bi.caBundle
71+
apiServiceCopy.Spec.CABundle = caBundle
6772
_, err = bi.client.Update(ctx, apiServiceCopy, v1.UpdateOptions{})
6873
return err
6974
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package cabundleinjector
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"sync/atomic"
8+
"time"
9+
10+
"k8s.io/klog/v2"
11+
12+
"github.com/openshift/library-go/pkg/controller/fileobserver"
13+
)
14+
15+
type bundleCache struct {
16+
v atomic.Pointer[[]byte]
17+
}
18+
19+
func (c *bundleCache) Load() []byte {
20+
if dataPtr := c.v.Load(); dataPtr != nil {
21+
return *dataPtr
22+
}
23+
return nil
24+
}
25+
26+
func (c *bundleCache) Store(data []byte) {
27+
if data == nil {
28+
c.v.Store(nil)
29+
} else {
30+
c.v.Store(&data)
31+
}
32+
}
33+
34+
type bundlesWatcher struct {
35+
// caBundle contains the signing CA bundle.
36+
CABundle *bundleCache
37+
// caBundleLegacy is constructed so that it matches what the old KCM used to do.
38+
// The signing CA bundle is appended to the service account ca.crt.
39+
CABundleLegacy *bundleCache
40+
41+
// OnReadFailed can be set for custom read error handling.
42+
// When the hook returns the original error, it is logged.
43+
// Logging happens automatically when no hook is set.
44+
OnReadFailed func(context.Context, error) error
45+
46+
caBundlePath string
47+
saTokenCABundlePath string
48+
pollingInterval time.Duration
49+
resyncInterval time.Duration
50+
maxReadAttempts int
51+
52+
// readFile is used to read files from disk.
53+
// Used for mocking during tests mostly.
54+
readFile func(string) ([]byte, error)
55+
}
56+
57+
func newBundlesWatcher(
58+
caBundlePath string, saTokenCABundlePath string,
59+
pollingInterval time.Duration,
60+
resyncInterval time.Duration,
61+
maxReadAttempts int,
62+
) *bundlesWatcher {
63+
return &bundlesWatcher{
64+
CABundle: &bundleCache{},
65+
CABundleLegacy: &bundleCache{},
66+
caBundlePath: caBundlePath,
67+
saTokenCABundlePath: saTokenCABundlePath,
68+
pollingInterval: pollingInterval,
69+
resyncInterval: resyncInterval,
70+
maxReadAttempts: maxReadAttempts,
71+
readFile: os.ReadFile,
72+
}
73+
}
74+
75+
func (w *bundlesWatcher) Start(ctx context.Context) error {
76+
// Read the files initially. This is how the previous implementation worked, so we retain that.
77+
caBundleData, saTokenCAData, legacyCABundleData, err := w.readBundles()
78+
if err != nil {
79+
return fmt.Errorf("failed to read bundles on startup: %w", err)
80+
}
81+
82+
w.CABundle.Store(caBundleData)
83+
w.CABundleLegacy.Store(legacyCABundleData)
84+
85+
// Start watching.
86+
observer, err := fileobserver.NewObserver(w.pollingInterval)
87+
if err != nil {
88+
return fmt.Errorf("failed to start CA bundles observer: %w", err)
89+
}
90+
91+
observer.AddReactor(func(_ string, _ fileobserver.ActionType) error {
92+
return w.reloadBundles(ctx)
93+
}, map[string][]byte{
94+
w.caBundlePath: caBundleData,
95+
w.saTokenCABundlePath: saTokenCAData,
96+
}, w.caBundlePath, w.saTokenCABundlePath)
97+
98+
go observer.Run(ctx.Done())
99+
100+
// Periodically re-read the bundles as a safety net
101+
// in case the file observer reactor callback fails.
102+
if w.resyncInterval > 0 {
103+
go func() {
104+
ticker := time.NewTicker(w.resyncInterval)
105+
defer ticker.Stop()
106+
for {
107+
select {
108+
case <-ctx.Done():
109+
return
110+
case <-ticker.C:
111+
if err := w.reloadBundles(ctx); err != nil {
112+
klog.Errorf("Failed to resync CA bundles: %v", err)
113+
}
114+
}
115+
}
116+
}()
117+
}
118+
119+
return nil
120+
}
121+
122+
func (w *bundlesWatcher) reloadBundles(ctx context.Context) error {
123+
caBundleData, _, legacyCABundleData, err := w.readBundles()
124+
if err != nil {
125+
// On read errors, keep existing cached content
126+
// until the next successful read.
127+
if w.OnReadFailed != nil {
128+
err = w.OnReadFailed(ctx, err)
129+
}
130+
return err
131+
}
132+
133+
w.CABundle.Store(caBundleData)
134+
w.CABundleLegacy.Store(legacyCABundleData)
135+
return nil
136+
}
137+
138+
func (w *bundlesWatcher) readBundles() ([]byte, []byte, []byte, error) {
139+
caBundleContent, err := w.readFileWithRetries(w.caBundlePath)
140+
if err != nil {
141+
if os.IsNotExist(err) {
142+
return nil, nil, nil, nil
143+
}
144+
return nil, nil, nil, fmt.Errorf("failed to read %q: %w", w.caBundlePath, err)
145+
}
146+
147+
// This construction matches what the old KCM used to do.
148+
// It added the entire ca.crt to the service account ca.crt.
149+
vulnerableLegacyCABundleContent := append([]byte{}, caBundleContent...)
150+
saTokenCABundleContent, err := w.readFileWithRetries(w.saTokenCABundlePath)
151+
if err != nil && !os.IsNotExist(err) {
152+
return nil, nil, nil, fmt.Errorf("failed to read %q: %w", w.saTokenCABundlePath, err)
153+
}
154+
if len(saTokenCABundleContent) > 0 {
155+
vulnerableLegacyCABundleContent = append(vulnerableLegacyCABundleContent, saTokenCABundleContent...)
156+
vulnerableLegacyCABundleContent = append(vulnerableLegacyCABundleContent, []byte("\n")...)
157+
}
158+
159+
return caBundleContent, saTokenCABundleContent, vulnerableLegacyCABundleContent, nil
160+
}
161+
162+
func (w *bundlesWatcher) readFileWithRetries(path string) ([]byte, error) {
163+
numAttempts := w.maxReadAttempts
164+
if numAttempts <= 0 {
165+
numAttempts = 1
166+
}
167+
168+
var lastErr error
169+
for range numAttempts {
170+
content, err := w.readFile(path)
171+
if err == nil {
172+
return content, nil
173+
}
174+
lastErr = err
175+
}
176+
return nil, lastErr
177+
}

0 commit comments

Comments
 (0)