Skip to content

Commit 2ee1ba4

Browse files
committed
Move file operations into internal/atomicfiles
1 parent c8b5cec commit 2ee1ba4

File tree

7 files changed

+103
-123
lines changed

7 files changed

+103
-123
lines changed

pkg/operator/staticpod/certsyncpod/certsync_controller.go

Lines changed: 6 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package certsyncpod
22

33
import (
4-
"bytes"
54
"context"
6-
"fmt"
75
"os"
86
"path/filepath"
97
"reflect"
@@ -19,8 +17,8 @@ import (
1917

2018
"github.com/openshift/library-go/pkg/controller/factory"
2119
"github.com/openshift/library-go/pkg/operator/events"
22-
"github.com/openshift/library-go/pkg/operator/staticpod"
2320
"github.com/openshift/library-go/pkg/operator/staticpod/controller/installer"
21+
"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicfiles"
2422
)
2523

2624
type CertSyncController struct {
@@ -154,7 +152,7 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
154152
continue
155153
}
156154

157-
errors = append(errors, writeFiles(&realFS, c.eventRecorder, "configmap", configMap.ObjectMeta, contentDir, data, 0644))
155+
errors = append(errors, writeFiles(c.eventRecorder, "configmap", configMap.ObjectMeta, contentDir, data, 0644))
158156
}
159157

160158
klog.Infof("Syncing secrets: %v", c.secrets)
@@ -239,104 +237,20 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
239237
continue
240238
}
241239

242-
errors = append(errors, writeFiles(&realFS, c.eventRecorder, "secret", secret.ObjectMeta, contentDir, data, 0600))
240+
errors = append(errors, writeFiles(c.eventRecorder, "secret", secret.ObjectMeta, contentDir, data, 0600))
243241
}
244242
return utilerrors.NewAggregate(errors)
245243
}
246244

247-
type fileSystem struct {
248-
MkdirAll func(path string, perm os.FileMode) error
249-
MkdirTemp func(dir, pattern string) (string, error)
250-
RemoveAll func(path string) error
251-
WriteFile func(name string, data []byte, perm os.FileMode) error
252-
SwapDirectoriesAtomic func(dirA, dirB string) error
253-
HashDirectory func(path string) ([]byte, error)
254-
}
255-
256-
var realFS = fileSystem{
257-
MkdirAll: os.MkdirAll,
258-
MkdirTemp: os.MkdirTemp,
259-
RemoveAll: os.RemoveAll,
260-
WriteFile: os.WriteFile,
261-
SwapDirectoriesAtomic: staticpod.SwapDirectoriesAtomic,
262-
HashDirectory: hashDirectory,
263-
}
264-
265245
func writeFiles[C string | []byte](
266-
fs *fileSystem, eventRecorder events.Recorder,
246+
eventRecorder events.Recorder,
267247
typeName string, o metav1.ObjectMeta,
268248
targetDir string, files map[string]C, filePerm os.FileMode,
269249
) error {
270-
// We are doing to prepare a tmp directory and write all files into that directory.
271-
// Then we are going to atomically swap the new data directory for the old one.
272-
// This is currently implemented as really atomically exchanging directories.
273-
//
274-
// The same goal of atomic swap could be implemented using symlinks much like AtomicWriter does in
275-
// https://github.com/kubernetes/kubernetes/blob/v1.34.0/pkg/volume/util/atomic_writer.go#L58
276-
// The reason we don't do that is that we already have a directory populated and watched that needs to we swapped,
277-
// in other words, it's for compatibility reasons. And if we were to migrate to the symlink approach,
278-
// we would anyway need to atomically turn the current data directory to a symlink.
279-
// This would all just increase complexity and require atomic swap on the OS level anyway.
280-
281-
// In case the target directory does not exist, create it so that the directory not existing is not a special case.
282-
klog.Infof("Ensuring content directory %q exists ...", targetDir)
283-
if err := fs.MkdirAll(targetDir, 0755); err != nil && !os.IsExist(err) {
284-
eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating content directory for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err)
285-
return err
286-
}
287-
288-
// We make sure the target directory is unchanged while we prepare the switch by computing the directory hash.
289-
klog.Infof("Hashing current content directory %q ...", targetDir)
290-
targetDirHashBefore, err := fs.HashDirectory(targetDir)
291-
if err != nil {
292-
eventRecorder.Warningf("CertificateUpdateFailed", "Failed to hash current content directory for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err)
293-
return err
294-
}
295-
296-
// Create a tmp source directory to be swapped.
297-
klog.Infof("Creating temporary directory to swap for %q ...", targetDir)
298-
tmpDir, err := fs.MkdirTemp(filepath.Dir(targetDir), filepath.Base(targetDir)+"-*")
299-
if err != nil {
300-
eventRecorder.Warningf("CertificateUpdateFailed", "Failed to create temporary directory for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err)
250+
if err := atomicfiles.WriteFiles(typeName, o, targetDir, files, filePerm); err != nil {
251+
eventRecorder.Warning("CertificateUpdateFailed", err.Error())
301252
return err
302253
}
303-
defer func() {
304-
if err := fs.RemoveAll(tmpDir); err != nil {
305-
klog.Errorf("Failed to remove temporary directory %q during cleanup: %v", tmpDir, err)
306-
}
307-
}()
308-
309-
// Populate the tmp directory with files.
310-
for filename, content := range files {
311-
fullFilename := filepath.Join(tmpDir, filename)
312-
klog.Infof("Writing %s manifest %q ...", typeName, fullFilename)
313-
314-
if err := fs.WriteFile(fullFilename, []byte(content), filePerm); err != nil {
315-
eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err)
316-
return err
317-
}
318-
}
319-
320-
// Make sure the target directory hasn't changed in the meantime.
321-
klog.Infof("Hashing current content directory %q again and ensuring it's unchanged ...", targetDir)
322-
targetDirHashAfter, err := fs.HashDirectory(targetDir)
323-
if err != nil {
324-
eventRecorder.Warningf("CertificateUpdateFailed", "Failed to hash current content directory for %s: %s/%s: %v", typeName, o.Namespace, o.Name, err)
325-
return err
326-
}
327-
if !bytes.Equal(targetDirHashBefore, targetDirHashAfter) {
328-
eventRecorder.Warningf("CertificateUpdateFailed", "Content directory changed while preparing to apply an update for %s: %s/%s", typeName, o.Namespace, o.Name)
329-
klog.Warningf("Content directory changed while preparing to apply an update: %q", targetDir)
330-
return fmt.Errorf("content directory changed while preparing to apply an update: %q", targetDir)
331-
}
332-
333-
// Swap directories atomically.
334-
klog.Infof("Atomically swapping target directory %q with temporary directory %q for %s: %s/%s ...", targetDir, tmpDir, typeName, o.Namespace, o.Name)
335-
if err := fs.SwapDirectoriesAtomic(targetDir, tmpDir); err != nil {
336-
eventRecorder.Warningf("CertificateUpdateFailed", "Failed to swap target directory %q with temporary directory %q for %s: %s/%s: %v", targetDir, tmpDir, typeName, o.Namespace, o.Name, err)
337-
return err
338-
}
339-
340254
eventRecorder.Eventf("CertificateUpdated", "Wrote updated %s: %s/%s", typeName, o.Namespace, o.Name)
341255
return nil
342256
}

pkg/operator/staticpod/certsyncpod/certsync_controller_linux_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestDynamicCertificates(t *testing.T) {
8585
}
8686
err = wait.PollUntilContextCancel(ctx, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) {
8787
// Replace the secret directory.
88-
if err := writeFiles(&realFS, recorder, typeName, om, secretDir, files, 0600); err != nil {
88+
if err := writeFiles(recorder, typeName, om, secretDir, files, 0600); err != nil {
8989
t.Errorf("Failed to write files: %v", err)
9090
return false, err
9191
}

pkg/operator/staticpod/file_utils_linux.go renamed to pkg/operator/staticpod/internal/atomicfiles/swap_directories_linux.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//go:build linux
22

3-
package staticpod
3+
package atomicfiles
44

55
import (
66
"fmt"
@@ -9,10 +9,10 @@ import (
99
"golang.org/x/sys/unix"
1010
)
1111

12-
// SwapDirectoriesAtomic can be used to swap two directories atomically.
12+
// SwapDirectories can be used to swap two directories atomically.
1313
//
1414
// This function requires absolute paths and will return an error if that's not the case.
15-
func SwapDirectoriesAtomic(dirA, dirB string) error {
15+
func SwapDirectories(dirA, dirB string) error {
1616
if !filepath.IsAbs(dirA) {
1717
return fmt.Errorf("not an absolute path: %q", dirA)
1818
}

pkg/operator/staticpod/file_utils_linux_test.go renamed to pkg/operator/staticpod/internal/atomicfiles/swap_directories_linux_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//go:build linux
22

3-
package staticpod
3+
package atomicfiles
44

55
import (
66
"bytes"
@@ -13,7 +13,7 @@ import (
1313
"k8s.io/apimachinery/pkg/util/sets"
1414
)
1515

16-
func TestSwapDirectoriesAtomic(t *testing.T) {
16+
func TestSwapDirectories(t *testing.T) {
1717
expectNoError := func(t *testing.T, err error) {
1818
t.Helper()
1919
if err != nil {
@@ -129,7 +129,7 @@ func TestSwapDirectoriesAtomic(t *testing.T) {
129129
for _, tc := range testCases {
130130
t.Run(tc.name, func(t *testing.T) {
131131
pathA, pathB := tc.setup(t, t.TempDir())
132-
tc.checkResult(t, pathA, pathB, SwapDirectoriesAtomic(pathA, pathB))
132+
tc.checkResult(t, pathA, pathB, SwapDirectories(pathA, pathB))
133133
})
134134
}
135135
}

pkg/operator/staticpod/file_utils_other.go renamed to pkg/operator/staticpod/internal/atomicfiles/swap_directories_other.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//go:build !linux
22

3-
package staticpod
3+
package atomicfiles
44

55
import (
66
"fmt"
@@ -9,11 +9,11 @@ import (
99
"path/filepath"
1010
)
1111

12-
// SwapDirectoriesAtomic swaps two directories, but NOT atomically in this case.
12+
// SwapDirectories swaps two directories, but NOT atomically in this case.
1313
// Atomic implementation is only available for Linux.
1414
// This function is essentially a mock for tests, and it simply uses os.Rename.
1515
// In case there is any error, the swapping process is left in an inconsistent state.
16-
func SwapDirectoriesAtomic(dirA, dirB string) error {
16+
func SwapDirectories(dirA, dirB string) error {
1717
// Still retain the constraints as in the Linux implementation.
1818
if !filepath.IsAbs(dirA) {
1919
return fmt.Errorf("not an absolute path: %q", dirA)
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package atomicfiles
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/klog/v2"
10+
)
11+
12+
// WriteFiles can be used to atomically write given files into the target directory.
13+
// This is done by populating a temporary directory, then atomically swapping it with the target directory.
14+
func WriteFiles[C string | []byte](
15+
typeName string, o metav1.ObjectMeta, targetDir string, files map[string]C, filePerm os.FileMode,
16+
) error {
17+
return writeFiles(&realFS, typeName, o, targetDir, files, filePerm)
18+
}
19+
20+
type fileSystem struct {
21+
MkdirAll func(path string, perm os.FileMode) error
22+
MkdirTemp func(dir, pattern string) (string, error)
23+
RemoveAll func(path string) error
24+
WriteFile func(name string, data []byte, perm os.FileMode) error
25+
SwapDirectoriesAtomic func(dirA, dirB string) error
26+
}
27+
28+
var realFS = fileSystem{
29+
MkdirAll: os.MkdirAll,
30+
MkdirTemp: os.MkdirTemp,
31+
RemoveAll: os.RemoveAll,
32+
WriteFile: os.WriteFile,
33+
SwapDirectoriesAtomic: SwapDirectories,
34+
}
35+
36+
func writeFiles[C string | []byte](
37+
fs *fileSystem, typeName string, o metav1.ObjectMeta,
38+
targetDir string, files map[string]C, filePerm os.FileMode,
39+
) error {
40+
// We are doing to prepare a tmp directory and write all files into that directory.
41+
// Then we are going to atomically swap the new data directory for the old one.
42+
// This is currently implemented as really atomically exchanging directories.
43+
//
44+
// The same goal of atomic swap could be implemented using symlinks much like AtomicWriter does in
45+
// https://github.com/kubernetes/kubernetes/blob/v1.34.0/pkg/volume/util/atomic_writer.go#L58
46+
// The reason we don't do that is that we already have a directory populated and watched that needs to we swapped,
47+
// in other words, it's for compatibility reasons. And if we were to migrate to the symlink approach,
48+
// we would anyway need to atomically turn the current data directory to a symlink.
49+
// This would all just increase complexity and require atomic swap on the OS level anyway.
50+
51+
// In case the target directory does not exist, create it so that the directory not existing is not a special case.
52+
klog.Infof("Ensuring content directory %q exists ...", targetDir)
53+
if err := fs.MkdirAll(targetDir, 0755); err != nil && !os.IsExist(err) {
54+
return fmt.Errorf("failed creating content directory for %s: %s/%s: %w", typeName, o.Namespace, o.Name, err)
55+
}
56+
57+
// Create a tmp source directory to be swapped.
58+
klog.Infof("Creating temporary directory to swap for %q ...", targetDir)
59+
tmpDir, err := fs.MkdirTemp(filepath.Dir(targetDir), filepath.Base(targetDir)+"-*")
60+
if err != nil {
61+
return fmt.Errorf("failed creating temporary directory for %s: %s/%s: %w", typeName, o.Namespace, o.Name, err)
62+
}
63+
defer func() {
64+
if err := fs.RemoveAll(tmpDir); err != nil {
65+
klog.Errorf("Failed to remove temporary directory %q during cleanup: %v", tmpDir, err)
66+
}
67+
}()
68+
69+
// Populate the tmp directory with files.
70+
for filename, content := range files {
71+
fullFilename := filepath.Join(tmpDir, filename)
72+
klog.Infof("Writing %s manifest %q ...", typeName, fullFilename)
73+
74+
if err := fs.WriteFile(fullFilename, []byte(content), filePerm); err != nil {
75+
return fmt.Errorf("failed writing file for %s: %s/%s: %w", typeName, o.Namespace, o.Name, err)
76+
}
77+
}
78+
79+
// Swap directories atomically.
80+
klog.Infof("Atomically swapping target directory %q with temporary directory %q for %s: %s/%s ...", targetDir, tmpDir, typeName, o.Namespace, o.Name)
81+
if err := fs.SwapDirectoriesAtomic(targetDir, tmpDir); err != nil {
82+
return fmt.Errorf("failed swapping target directory %q with temporary directory %q for %s: %s/%s: %w", targetDir, tmpDir, typeName, o.Namespace, o.Name, err)
83+
}
84+
return nil
85+
}

pkg/operator/staticpod/certsyncpod/certsync_controller_test.go renamed to pkg/operator/staticpod/internal/atomicfiles/write_files_test.go

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package certsyncpod
1+
package atomicfiles
22

33
import (
44
"errors"
@@ -8,8 +8,6 @@ import (
88
"testing"
99

1010
"github.com/google/go-cmp/cmp"
11-
12-
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
1311
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1412
)
1513

@@ -289,27 +287,10 @@ func TestWriteFiles(t *testing.T) {
289287
}
290288
return fs
291289
}),
292-
errorTestCase("directory unchanged on content directory hash mismatch", func() *fileSystem {
293-
fs := newRealFS()
294-
fs.HashDirectory = hashDirectoryFuncWithReturnValues("hash", "mismatch")
295-
return fs
296-
}),
297-
errorTestCase("directory unchanged on content directory initial hash error", func() *fileSystem {
298-
fs := newRealFS()
299-
fs.HashDirectory = hashDirectoryFuncWithReturnValues("error: nuked")
300-
return fs
301-
}),
302-
errorTestCase("directory unchanged on content directory subsequent hash error", func() *fileSystem {
303-
fs := newRealFS()
304-
fs.HashDirectory = hashDirectoryFuncWithReturnValues("hash", "error: nuked")
305-
return fs
306-
}),
307290
}
308291

309292
for _, tc := range testCases {
310293
t.Run(tc.name, func(t *testing.T) {
311-
recorder := eventstesting.NewTestingEventRecorder(t)
312-
313294
// Write the current directory contents.
314295
contentDir := filepath.Join(t.TempDir(), "secrets", om.Name)
315296
if tc.directoryData != nil {
@@ -326,7 +307,7 @@ func TestWriteFiles(t *testing.T) {
326307
}
327308

328309
// Replace with the object data.
329-
err := writeFiles(tc.newFS(), recorder, typeName, om, contentDir, tc.objectData, 0600)
310+
err := writeFiles(tc.newFS(), typeName, om, contentDir, tc.objectData, 0600)
330311

331312
// Check the resulting state.
332313
tc.checkState(t, contentDir, tc.directoryData, tc.objectData, err)

0 commit comments

Comments
 (0)