Skip to content

Commit 83343d9

Browse files
committed
cabundleinjector: Watch bundle files for changes
Add a watcher to watch all relevant files for changes so that they are automatically reloaded when modified.
1 parent 77fa780 commit 83343d9

File tree

8 files changed

+661
-42
lines changed

8 files changed

+661
-42
lines changed

pkg/controller/cabundleinjector/admissionwebhook.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cabundleinjector
33
import (
44
"bytes"
55
"context"
6+
"errors"
67

78
admissionregv1 "k8s.io/api/admissionregistration/v1"
89
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -90,10 +91,15 @@ type webhookCABundleInjector[T admissionregv1.MutatingWebhookConfiguration | adm
9091
client webhookConfigUpdater[T]
9192
lister cachedWebhookConfigGetter[T]
9293
newWebhookConfigAccessor func(*T) webhookConfigAccessor[T]
93-
caBundle []byte
94+
caBundle *bundleCache
9495
}
9596

9697
func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
98+
caBundle := bi.caBundle.Load()
99+
if caBundle == nil {
100+
return errors.New("CA bundle is not available")
101+
}
102+
97103
webhookConfig, err := bi.lister.Get(syncCtx.QueueKey())
98104
if apierrors.IsNotFound(err) {
99105
return nil
@@ -105,7 +111,7 @@ func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.
105111
webhooksNeedingUpdate := []int{}
106112
for i := 0; i < webhookConfigAccessor.WebhooksLen(); i++ {
107113
webhookClientConfig := webhookConfigAccessor.GetWebhookClientCA(i)
108-
if !bytes.Equal(webhookClientConfig.CABundle, bi.caBundle) {
114+
if !bytes.Equal(webhookClientConfig.CABundle, caBundle) {
109115
webhooksNeedingUpdate = append(webhooksNeedingUpdate, i)
110116
}
111117
}
@@ -118,7 +124,7 @@ func (bi *webhookCABundleInjector[T]) Sync(ctx context.Context, syncCtx factory.
118124
// make a copy to avoid mutating cache state
119125
webhookConfigCopy := webhookConfigAccessor.DeepCopy()
120126
for _, i := range webhooksNeedingUpdate {
121-
webhookConfigCopy.GetWebhookClientCA(i).CABundle = bi.caBundle
127+
webhookConfigCopy.GetWebhookClientCA(i).CABundle = caBundle
122128
}
123129
_, err = bi.client.Update(ctx, webhookConfigCopy.GetObject(), metav1.UpdateOptions{})
124130
return err

pkg/controller/cabundleinjector/admissionwebhook_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,15 @@ func TestWebhookCABundleInjectorSync(t *testing.T) {
9292
waitSuccess := cache.WaitForCacheSync(testCtx.Done(), webhookInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced)
9393
require.True(t, waitSuccess)
9494

95+
cache := &bundleCache{}
96+
cache.Store(testCABundle)
97+
9598
injector := webhookCABundleInjector[admissionregv1.ValidatingWebhookConfiguration]{
9699
webhookConfigType: "testwebhook",
97100
newWebhookConfigAccessor: newValidatingWebhookAccessor,
98101
client: webhookClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
99102
lister: webhookInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Lister(),
100-
caBundle: testCABundle,
103+
caBundle: cache,
101104
}
102105

103106
if gotErr := injector.Sync(testCtx, testContext{"test-webhook"}); (gotErr != nil) != tt.wantErr {

pkg/controller/cabundleinjector/apiservice.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package cabundleinjector
33
import (
44
"bytes"
55
"context"
6-
6+
"errors"
77
apierrors "k8s.io/apimachinery/pkg/api/errors"
88
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
99
"k8s.io/klog/v2"
@@ -19,7 +19,7 @@ import (
1919
type apiServiceCABundleInjector struct {
2020
client apiserviceclientv1.APIServiceInterface
2121
lister apiservicelister.APIServiceLister
22-
caBundle []byte
22+
caBundle *bundleCache
2323
}
2424

2525
func newAPIServiceInjectorConfig(config *caBundleInjectorConfig) controllerConfig {
@@ -48,22 +48,27 @@ func newAPIServiceInjectorConfig(config *caBundleInjectorConfig) controllerConfi
4848
}
4949

5050
func (bi *apiServiceCABundleInjector) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
51+
caBundle := bi.caBundle.Load()
52+
if caBundle == nil {
53+
return errors.New("CA bundle is not available")
54+
}
55+
5156
apiService, err := bi.lister.Get(syncCtx.QueueKey())
5257
if apierrors.IsNotFound(err) {
5358
return nil
5459
} else if err != nil {
5560
return err
5661
}
5762

58-
if bytes.Equal(apiService.Spec.CABundle, bi.caBundle) {
63+
if bytes.Equal(apiService.Spec.CABundle, caBundle) {
5964
return nil
6065
}
6166

6267
klog.Infof("updating apiservice %s with the service signing CA bundle", apiService.Name)
6368

6469
// avoid mutating our cache
6570
apiServiceCopy := apiService.DeepCopy()
66-
apiServiceCopy.Spec.CABundle = bi.caBundle
71+
apiServiceCopy.Spec.CABundle = caBundle
6772
_, err = bi.client.Update(ctx, apiServiceCopy, v1.UpdateOptions{})
6873
return err
6974
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package cabundleinjector
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"sync/atomic"
8+
"time"
9+
10+
"k8s.io/klog/v2"
11+
12+
"github.com/openshift/library-go/pkg/controller/fileobserver"
13+
)
14+
15+
type bundleCache struct {
16+
v atomic.Pointer[[]byte]
17+
}
18+
19+
func (c *bundleCache) Load() []byte {
20+
if dataPtr := c.v.Load(); dataPtr != nil {
21+
return *dataPtr
22+
}
23+
return nil
24+
}
25+
26+
func (c *bundleCache) Store(data []byte) {
27+
if data == nil {
28+
c.v.Store(nil)
29+
} else {
30+
c.v.Store(&data)
31+
}
32+
}
33+
34+
type bundlesWatcher struct {
35+
// caBundle contains the signing CA bundle.
36+
CABundle *bundleCache
37+
// caBundleLegacy is constructed so that it matches what the old KCM used to do.
38+
// The signing CA bundle is appended to the service account ca.crt.
39+
CABundleLegacy *bundleCache
40+
41+
// OnReadFailed can be set for custom read error handling.
42+
// When the hook returns the original error, it is logged.
43+
// Logging happens automatically when no hook is set.
44+
OnReadFailed func(context.Context, error) error
45+
46+
caBundlePath string
47+
saTokenCABundlePath string
48+
pollingInterval time.Duration
49+
resyncInterval time.Duration
50+
maxReadAttempts int
51+
52+
// readFile is used to read files from disk.
53+
// Used for mocking during tests mostly.
54+
readFile func(string) ([]byte, error)
55+
}
56+
57+
func newBundlesWatcher(
58+
caBundlePath string, saTokenCABundlePath string,
59+
pollingInterval time.Duration,
60+
resyncInterval time.Duration,
61+
maxReadAttempts int,
62+
) *bundlesWatcher {
63+
return &bundlesWatcher{
64+
CABundle: &bundleCache{},
65+
CABundleLegacy: &bundleCache{},
66+
caBundlePath: caBundlePath,
67+
saTokenCABundlePath: saTokenCABundlePath,
68+
pollingInterval: pollingInterval,
69+
resyncInterval: resyncInterval,
70+
maxReadAttempts: maxReadAttempts,
71+
readFile: os.ReadFile,
72+
}
73+
}
74+
75+
func (w *bundlesWatcher) Start(ctx context.Context) error {
76+
var (
77+
caBundleData []byte
78+
saTokenCAData []byte
79+
)
80+
reload := func(onError func(context.Context, error) error) error {
81+
var (
82+
legacyCABundleData []byte
83+
err error
84+
)
85+
caBundleData, saTokenCAData, legacyCABundleData, err = w.readBundles()
86+
if err != nil {
87+
// On read errors, keep existing cached content
88+
// until the next successful read.
89+
if onError != nil {
90+
err = onError(ctx, err)
91+
}
92+
return err
93+
}
94+
95+
w.CABundle.Store(caBundleData)
96+
w.CABundleLegacy.Store(legacyCABundleData)
97+
return nil
98+
}
99+
100+
// Read the files initially. This is how the previous implementation worked, so we retain that.
101+
if err := reload(nil); err != nil {
102+
return err
103+
}
104+
105+
// Start watching.
106+
observer, err := fileobserver.NewObserver(w.pollingInterval)
107+
if err != nil {
108+
return fmt.Errorf("failed to start CA bundles observer: %w", err)
109+
}
110+
111+
observer.AddReactor(func(_ string, _ fileobserver.ActionType) error {
112+
return reload(w.OnReadFailed)
113+
}, map[string][]byte{
114+
w.caBundlePath: caBundleData,
115+
w.saTokenCABundlePath: saTokenCAData,
116+
}, w.caBundlePath, w.saTokenCABundlePath)
117+
118+
go observer.Run(ctx.Done())
119+
120+
// Periodically re-read the bundles as a safety net
121+
// in case the file observer reactor callback fails.
122+
if w.resyncInterval > 0 {
123+
go func() {
124+
ticker := time.NewTicker(w.resyncInterval)
125+
defer ticker.Stop()
126+
for {
127+
select {
128+
case <-ctx.Done():
129+
return
130+
case <-ticker.C:
131+
if err := reload(w.OnReadFailed); err != nil {
132+
klog.Errorf("Failed to resync CA bundles: %v", err)
133+
}
134+
}
135+
}
136+
}()
137+
}
138+
139+
return nil
140+
}
141+
142+
func (w *bundlesWatcher) readBundles() ([]byte, []byte, []byte, error) {
143+
caBundleContent, err := w.readFileWithRetries(w.caBundlePath)
144+
if err != nil {
145+
if os.IsNotExist(err) {
146+
return nil, nil, nil, nil
147+
}
148+
return nil, nil, nil, fmt.Errorf("failed to read %q: %w", w.caBundlePath, err)
149+
}
150+
151+
// This construction matches what the old KCM used to do.
152+
// It added the entire ca.crt to the service account ca.crt.
153+
vulnerableLegacyCABundleContent := append([]byte{}, caBundleContent...)
154+
saTokenCABundleContent, err := w.readFileWithRetries(w.saTokenCABundlePath)
155+
if err != nil && !os.IsNotExist(err) {
156+
return nil, nil, nil, fmt.Errorf("failed to read %q: %w", w.saTokenCABundlePath, err)
157+
}
158+
if len(saTokenCABundleContent) > 0 {
159+
vulnerableLegacyCABundleContent = append(vulnerableLegacyCABundleContent, saTokenCABundleContent...)
160+
vulnerableLegacyCABundleContent = append(vulnerableLegacyCABundleContent, []byte("\n")...)
161+
}
162+
163+
return caBundleContent, saTokenCABundleContent, vulnerableLegacyCABundleContent, nil
164+
}
165+
166+
func (w *bundlesWatcher) readFileWithRetries(path string) ([]byte, error) {
167+
numAttempts := w.maxReadAttempts
168+
if numAttempts <= 0 {
169+
numAttempts = 1
170+
}
171+
172+
var lastErr error
173+
for range numAttempts {
174+
content, err := w.readFile(path)
175+
if err == nil {
176+
return content, nil
177+
}
178+
lastErr = err
179+
}
180+
return nil, lastErr
181+
}

0 commit comments

Comments
 (0)