|
1 | 1 | package certsyncpod
|
2 | 2 |
|
3 | 3 | import (
|
| 4 | + "bytes" |
4 | 5 | "context"
|
| 6 | + "fmt" |
5 | 7 | "os"
|
6 | 8 | "path/filepath"
|
7 | 9 | "reflect"
|
@@ -115,7 +117,7 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
|
115 | 117 |
|
116 | 118 | contentDir := getConfigMapDir(c.destinationDir, cm.Name)
|
117 | 119 |
|
118 |
| - data := map[string]string{} |
| 120 | + data := make(map[string]string, len(configMap.Data)) |
119 | 121 | for filename := range configMap.Data {
|
120 | 122 | fullFilename := filepath.Join(contentDir, filename)
|
121 | 123 |
|
@@ -152,27 +154,7 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
|
152 | 154 | continue
|
153 | 155 | }
|
154 | 156 |
|
155 |
| - klog.Infof("Creating directory %q ...", contentDir) |
156 |
| - if err := os.MkdirAll(contentDir, 0755); err != nil && !os.IsExist(err) { |
157 |
| - c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating directory for configmap: %s/%s: %v", configMap.Namespace, configMap.Name, err) |
158 |
| - errors = append(errors, err) |
159 |
| - continue |
160 |
| - } |
161 |
| - for filename, content := range configMap.Data { |
162 |
| - fullFilename := filepath.Join(contentDir, filename) |
163 |
| - // if the existing is the same, do nothing |
164 |
| - if reflect.DeepEqual(data[fullFilename], content) { |
165 |
| - continue |
166 |
| - } |
167 |
| - |
168 |
| - klog.Infof("Writing configmap manifest %q ...", fullFilename) |
169 |
| - if err := staticpod.WriteFileAtomic([]byte(content), 0644, fullFilename); err != nil { |
170 |
| - c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for configmap: %s/%s: %v", configMap.Namespace, configMap.Name, err) |
171 |
| - errors = append(errors, err) |
172 |
| - continue |
173 |
| - } |
174 |
| - } |
175 |
| - c.eventRecorder.Eventf("CertificateUpdated", "Wrote updated configmap: %s/%s", configMap.Namespace, configMap.Name) |
| 157 | + errors = append(errors, writeFiles(&realFS, c.eventRecorder, "configmap", configMap.ObjectMeta, contentDir, data, 0644)) |
176 | 158 | }
|
177 | 159 |
|
178 | 160 | klog.Infof("Syncing secrets: %v", c.secrets)
|
@@ -220,7 +202,7 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
|
220 | 202 |
|
221 | 203 | contentDir := getSecretDir(c.destinationDir, s.Name)
|
222 | 204 |
|
223 |
| - data := map[string][]byte{} |
| 205 | + data := make(map[string][]byte, len(secret.Data)) |
224 | 206 | for filename := range secret.Data {
|
225 | 207 | fullFilename := filepath.Join(contentDir, filename)
|
226 | 208 |
|
@@ -257,29 +239,104 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
|
257 | 239 | continue
|
258 | 240 | }
|
259 | 241 |
|
260 |
| - klog.Infof("Creating directory %q ...", contentDir) |
261 |
| - if err := os.MkdirAll(contentDir, 0755); err != nil && !os.IsExist(err) { |
262 |
| - c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating directory for secret: %s/%s: %v", secret.Namespace, secret.Name, err) |
263 |
| - errors = append(errors, err) |
264 |
| - continue |
| 242 | + errors = append(errors, writeFiles(&realFS, c.eventRecorder, "secret", secret.ObjectMeta, contentDir, data, 0600)) |
| 243 | + } |
| 244 | + return utilerrors.NewAggregate(errors) |
| 245 | +} |
| 246 | + |
| 247 | +type fileSystem struct { |
| 248 | + MkdirAll func(path string, perm os.FileMode) error |
| 249 | + MkdirTemp func(dir, pattern string) (string, error) |
| 250 | + RemoveAll func(path string) error |
| 251 | + WriteFile func(name string, data []byte, perm os.FileMode) error |
| 252 | + SwapDirectoriesAtomic func(dirA, dirB string) error |
| 253 | + HashDirectory func(path string) ([]byte, error) |
| 254 | +} |
| 255 | + |
| 256 | +var realFS = fileSystem{ |
| 257 | + MkdirAll: os.MkdirAll, |
| 258 | + MkdirTemp: os.MkdirTemp, |
| 259 | + RemoveAll: os.RemoveAll, |
| 260 | + WriteFile: os.WriteFile, |
| 261 | + SwapDirectoriesAtomic: staticpod.SwapDirectoriesAtomic, |
| 262 | + HashDirectory: hashDirectory, |
| 263 | +} |
| 264 | + |
| 265 | +func writeFiles[C string | []byte]( |
| 266 | + fs *fileSystem, eventRecorder events.Recorder, |
| 267 | + typeName string, o metav1.ObjectMeta, |
| 268 | + targetDir string, files map[string]C, filePerm os.FileMode, |
| 269 | +) error { |
| 270 | + // We are doing to prepare a tmp directory and write all files into that directory. |
| 271 | + // Then we are going to atomically swap the new data directory for the old one. |
| 272 | + // This is currently implemented as really atomically exchanging directories. |
| 273 | + // |
| 274 | + // The same goal of atomic swap could be implemented using symlinks much like AtomicWriter does in |
| 275 | + // https://github.com/kubernetes/kubernetes/blob/v1.34.0/pkg/volume/util/atomic_writer.go#L58 |
| 276 | + // The reason we don't do that is that we already have a directory populated and watched that needs to we swapped, |
| 277 | + // in other words, it's for compatibility reasons. And if we were to migrate to the symlink approach, |
| 278 | + // we would anyway need to atomically turn the current data directory to a symlink. |
| 279 | + // This would all just increase complexity and require atomic swap on the OS level anyway. |
| 280 | + |
| 281 | + // In case the target directory does not exist, create it so that the directory not existing is not a special case. |
| 282 | + klog.Infof("Ensuring content directory %q exists ...", targetDir) |
| 283 | + if err := fs.MkdirAll(targetDir, 0755); err != nil && !os.IsExist(err) { |
| 284 | + eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating content directory for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err) |
| 285 | + return err |
| 286 | + } |
| 287 | + |
| 288 | + // We make sure the target directory is unchanged while we prepare the switch by computing the directory hash. |
| 289 | + klog.Infof("Hashing current content directory %q ...", targetDir) |
| 290 | + targetDirHashBefore, err := fs.HashDirectory(targetDir) |
| 291 | + if err != nil { |
| 292 | + eventRecorder.Warningf("CertificateUpdateFailed", "Failed to hash current content directory for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err) |
| 293 | + return err |
| 294 | + } |
| 295 | + |
| 296 | + // Create a tmp source directory to be swapped. |
| 297 | + klog.Infof("Creating temporary directory to swap for %q ...", targetDir) |
| 298 | + tmpDir, err := fs.MkdirTemp(filepath.Dir(targetDir), filepath.Base(targetDir)+"-*") |
| 299 | + if err != nil { |
| 300 | + eventRecorder.Warningf("CertificateUpdateFailed", "Failed to create temporary directory for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err) |
| 301 | + return err |
| 302 | + } |
| 303 | + defer func() { |
| 304 | + if err := fs.RemoveAll(tmpDir); err != nil { |
| 305 | + klog.Errorf("Failed to remove temporary directory %q during cleanup: %v", tmpDir, err) |
265 | 306 | }
|
266 |
| - for filename, content := range secret.Data { |
267 |
| - // TODO fix permissions |
268 |
| - fullFilename := filepath.Join(contentDir, filename) |
269 |
| - // if the existing is the same, do nothing |
270 |
| - if reflect.DeepEqual(data[fullFilename], content) { |
271 |
| - continue |
272 |
| - } |
| 307 | + }() |
273 | 308 |
|
274 |
| - klog.Infof("Writing secret manifest %q ...", fullFilename) |
275 |
| - if err := staticpod.WriteFileAtomic(content, 0600, fullFilename); err != nil { |
276 |
| - c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for secret: %s/%s: %v", secret.Namespace, secret.Name, err) |
277 |
| - errors = append(errors, err) |
278 |
| - continue |
279 |
| - } |
| 309 | + // Populate the tmp directory with files. |
| 310 | + for filename, content := range files { |
| 311 | + fullFilename := filepath.Join(tmpDir, filename) |
| 312 | + klog.Infof("Writing %s manifest %q ...", typeName, fullFilename) |
| 313 | + |
| 314 | + if err := fs.WriteFile(fullFilename, []byte(content), filePerm); err != nil { |
| 315 | + eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err) |
| 316 | + return err |
280 | 317 | }
|
281 |
| - c.eventRecorder.Eventf("CertificateUpdated", "Wrote updated secret: %s/%s", secret.Namespace, secret.Name) |
282 | 318 | }
|
283 | 319 |
|
284 |
| - return utilerrors.NewAggregate(errors) |
| 320 | + // Make sure the target directory hasn't changed in the meantime. |
| 321 | + klog.Infof("Hashing current content directory %q again and ensuring it's unchanged ...", targetDir) |
| 322 | + targetDirHashAfter, err := fs.HashDirectory(targetDir) |
| 323 | + if err != nil { |
| 324 | + eventRecorder.Warningf("CertificateUpdateFailed", "Failed to hash current content directory for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err) |
| 325 | + return err |
| 326 | + } |
| 327 | + if !bytes.Equal(targetDirHashBefore, targetDirHashAfter) { |
| 328 | + eventRecorder.Warningf("CertificateUpdateFailed", "Content directory changed while preparing to apply an update for %s: %s/%s", typeName, o.Namespace, o.Name) |
| 329 | + klog.Warningf("Content directory changed while preparing to apply an update: %q", targetDir) |
| 330 | + return fmt.Errorf("content directory changed while preparing to apply an update: %q", targetDir) |
| 331 | + } |
| 332 | + |
| 333 | + // Swap directories atomically. |
| 334 | + klog.Infof("Atomically swapping target directory %q with temporary directory %q for %s: %s/%s ...", targetDir, tmpDir, typeName, o.Namespace, o.Name) |
| 335 | + if err := fs.SwapDirectoriesAtomic(targetDir, tmpDir); err != nil { |
| 336 | + eventRecorder.Warningf("CertificateUpdateFailed", "Failed to swap target directory %q with temporary directory %q for %s: %s/%s: %v", targetDir, tmpDir, typeName, o.Namespace, o.Name, err) |
| 337 | + return err |
| 338 | + } |
| 339 | + |
| 340 | + eventRecorder.Eventf("CertificateUpdated", "Wrote updated %s: %s/%s", typeName, o.Namespace, o.Name) |
| 341 | + return nil |
285 | 342 | }
|
0 commit comments