Skip to content

Commit 1ab1286

Browse files
committed
Make storage file writes atomic
1 parent d4830bc commit 1ab1286

File tree

14 files changed

+1200
-43
lines changed

14 files changed

+1200
-43
lines changed

controllers/helmchart_controller.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ limitations under the License.
1717
package controllers
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"fmt"
23+
"io"
2224
"io/ioutil"
2325
"net/url"
2426
"os"
@@ -255,12 +257,8 @@ func (r *HelmChartReconciler) reconcileFromHelmRepository(ctx context.Context,
255257
return sourcev1.HelmChartNotReady(chart, sourcev1.ChartPullFailedReason, err.Error()), err
256258
}
257259

258-
chartBytes, err := ioutil.ReadAll(res)
259-
if err != nil {
260-
return sourcev1.HelmChartNotReady(chart, sourcev1.ChartPullFailedReason, err.Error()), err
261-
}
262-
263-
sum := r.Storage.Checksum(chartBytes)
260+
var buf bytes.Buffer
261+
sum := r.Storage.Checksum(io.TeeReader(res, &buf))
264262
artifact := r.Storage.ArtifactFor(chart.Kind, chart.GetObjectMeta(),
265263
fmt.Sprintf("%s-%s-%s.tgz", cv.Name, cv.Version, sum), cv.Version, sum)
266264

@@ -280,7 +278,7 @@ func (r *HelmChartReconciler) reconcileFromHelmRepository(ctx context.Context,
280278
defer unlock()
281279

282280
// save artifact to storage
283-
err = r.Storage.WriteFile(artifact, chartBytes)
281+
err = r.Storage.AtomicWriteFile(artifact, &buf, 0644)
284282
if err != nil {
285283
err = fmt.Errorf("unable to write chart file: %w", err)
286284
return sourcev1.HelmChartNotReady(chart, sourcev1.StorageOperationFailedReason, err.Error()), err

controllers/helmrepository_controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package controllers
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"fmt"
2223
"io/ioutil"
@@ -212,23 +213,23 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, repository sou
212213
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.IndexationFailedReason, err.Error()), err
213214
}
214215

215-
data, err := ioutil.ReadAll(res)
216+
b, err := ioutil.ReadAll(res)
216217
if err != nil {
217218
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.IndexationFailedReason, err.Error()), err
218219
}
219220

220-
i := &repo.IndexFile{}
221-
if err := yaml.Unmarshal(data, i); err != nil {
221+
i := repo.IndexFile{}
222+
if err := yaml.Unmarshal(b, &i); err != nil {
222223
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.IndexationFailedReason, err.Error()), err
223224
}
224225
i.SortEntries()
225226

226-
index, err := yaml.Marshal(i)
227+
b, err = yaml.Marshal(&i)
227228
if err != nil {
228229
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.IndexationFailedReason, err.Error()), err
229230
}
230231

231-
sum := r.Storage.Checksum(index)
232+
sum := r.Storage.Checksum(bytes.NewReader(b))
232233
artifact := r.Storage.ArtifactFor(repository.Kind, repository.ObjectMeta.GetObjectMeta(),
233234
fmt.Sprintf("index-%s.yaml", sum), i.Generated.Format(time.RFC3339Nano), sum)
234235

@@ -248,7 +249,7 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, repository sou
248249
defer unlock()
249250

250251
// save artifact to storage
251-
err = r.Storage.WriteFile(artifact, index)
252+
err = r.Storage.AtomicWriteFile(artifact, bytes.NewReader(b), 0644)
252253
if err != nil {
253254
err = fmt.Errorf("unable to write repository index file: %w", err)
254255
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err

controllers/helmrepository_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ var _ = Describe("HelmRepositoryReconciler", func() {
140140
Eventually(func() error {
141141
r := &sourcev1.HelmRepository{}
142142
return k8sClient.Get(context.Background(), key, r)
143-
}).ShouldNot(Succeed())
143+
}, timeout, interval).ShouldNot(Succeed())
144144

145145
exists := func(path string) bool {
146146
// wait for tmp sync on macOS

controllers/storage.go

Lines changed: 82 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/fluxcd/pkg/lockedfile"
3737

3838
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
39+
"github.com/fluxcd/source-controller/internal/fs"
3940
)
4041

4142
const (
@@ -95,7 +96,8 @@ func (s *Storage) RemoveAll(artifact sourcev1.Artifact) error {
9596
return os.RemoveAll(dir)
9697
}
9798

98-
// RemoveAllButCurrent removes all files for the given artifact base dir excluding the current one
99+
// RemoveAllButCurrent removes all files for the given v1alpha1.Artifact base dir,
100+
// excluding the current one.
99101
func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) error {
100102
localPath := s.LocalPath(artifact)
101103
dir := filepath.Dir(localPath)
@@ -120,8 +122,8 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) error {
120122
return nil
121123
}
122124

123-
// ArtifactExist returns a boolean indicating whether the artifact exists in storage and is a
124-
// regular file.
125+
// ArtifactExist returns a boolean indicating whether the v1alpha1.Artifact exists in storage
126+
// and is a regular file.
125127
func (s *Storage) ArtifactExist(artifact sourcev1.Artifact) bool {
126128
fi, err := os.Lstat(s.LocalPath(artifact))
127129
if err != nil {
@@ -130,9 +132,10 @@ func (s *Storage) ArtifactExist(artifact sourcev1.Artifact) bool {
130132
return fi.Mode().IsRegular()
131133
}
132134

133-
// Archive creates a tar.gz to the artifact path from the given dir excluding any VCS specific
134-
// files and directories, or any of the excludes defined in the excludeFiles.
135-
func (s *Storage) Archive(artifact sourcev1.Artifact, dir string, spec sourcev1.GitRepositorySpec) error {
135+
// Archive atomically creates a tar.gz to the v1alpha1.Artifact path from the given dir,
136+
// excluding any VCS specific files and directories, or any of the excludes defined in
137+
// the excludeFiles.
138+
func (s *Storage) Archive(artifact sourcev1.Artifact, dir string, spec sourcev1.GitRepositorySpec) (err error) {
136139
if _, err := os.Stat(dir); err != nil {
137140
return err
138141
}
@@ -141,22 +144,53 @@ func (s *Storage) Archive(artifact sourcev1.Artifact, dir string, spec sourcev1.
141144
if err != nil {
142145
return err
143146
}
144-
145147
matcher := gitignore.NewMatcher(ps)
146148

147-
gzFile, err := os.Create(s.LocalPath(artifact))
149+
localPath := s.LocalPath(artifact)
150+
tmpGzFile, err := ioutil.TempFile(filepath.Split(localPath))
148151
if err != nil {
149152
return err
150153
}
151-
defer gzFile.Close()
152-
153-
gw := gzip.NewWriter(gzFile)
154-
defer gw.Close()
154+
tmpName := tmpGzFile.Name()
155+
defer func() {
156+
if err != nil {
157+
os.Remove(tmpName)
158+
}
159+
}()
155160

161+
gw := gzip.NewWriter(tmpGzFile)
156162
tw := tar.NewWriter(gw)
157-
defer tw.Close()
163+
if err := writeToArchiveExcludeMatches(dir, matcher, tw); err != nil {
164+
tw.Close()
165+
gw.Close()
166+
tmpGzFile.Close()
167+
return err
168+
}
169+
170+
if err := tw.Close(); err != nil {
171+
gw.Close()
172+
tmpGzFile.Close()
173+
return err
174+
}
175+
if err := gw.Close(); err != nil {
176+
tmpGzFile.Close()
177+
return err
178+
}
179+
if err := tmpGzFile.Close(); err != nil {
180+
return err
181+
}
182+
183+
if err := os.Chmod(tmpName, 0644); err != nil {
184+
return err
185+
}
158186

159-
return filepath.Walk(dir, func(p string, fi os.FileInfo, err error) error {
187+
return fs.RenameWithFallback(tmpName, localPath)
188+
}
189+
190+
// writeToArchiveExcludeMatches walks over the given dir and writes any regular file that does
191+
// not match the given gitignore.Matcher.
192+
func writeToArchiveExcludeMatches(dir string, matcher gitignore.Matcher, writer *tar.Writer) error {
193+
fn := func(p string, fi os.FileInfo, err error) error {
160194
if err != nil {
161195
return err
162196
}
@@ -187,33 +221,48 @@ func (s *Storage) Archive(artifact sourcev1.Artifact, dir string, spec sourcev1.
187221
}
188222
header.Name = relFilePath
189223

190-
if err := tw.WriteHeader(header); err != nil {
224+
if err := writer.WriteHeader(header); err != nil {
191225
return err
192226
}
193227

194228
f, err := os.Open(p)
195229
if err != nil {
230+
f.Close()
196231
return err
197232
}
198-
if _, err := io.Copy(tw, f); err != nil {
233+
if _, err := io.Copy(writer, f); err != nil {
199234
f.Close()
200235
return err
201236
}
202237
return f.Close()
203-
})
238+
}
239+
return filepath.Walk(dir, fn)
204240
}
205241

206-
// WriteFile writes the given bytes to the artifact path if the checksum differs
207-
func (s *Storage) WriteFile(artifact sourcev1.Artifact, data []byte) error {
242+
// AtomicWriteFile atomically writes a file to the v1alpha1.Artifact Path.
243+
func (s *Storage) AtomicWriteFile(artifact sourcev1.Artifact, reader io.Reader, mode os.FileMode) (err error) {
208244
localPath := s.LocalPath(artifact)
209-
sum := s.Checksum(data)
210-
if file, err := os.Stat(localPath); !os.IsNotExist(err) && !file.IsDir() {
211-
if fb, err := ioutil.ReadFile(localPath); err == nil && sum == s.Checksum(fb) {
212-
return nil
245+
tmpFile, err := ioutil.TempFile(filepath.Split(localPath))
246+
if err != nil {
247+
return err
248+
}
249+
tmpName := tmpFile.Name()
250+
defer func() {
251+
if err != nil {
252+
os.Remove(tmpName)
213253
}
254+
}()
255+
if _, err := io.Copy(tmpFile, reader); err != nil {
256+
tmpFile.Close()
257+
return err
214258
}
215-
216-
return ioutil.WriteFile(localPath, data, 0644)
259+
if err := tmpFile.Close(); err != nil {
260+
return err
261+
}
262+
if err := os.Chmod(tmpName, mode); err != nil {
263+
return err
264+
}
265+
return fs.RenameWithFallback(tmpName, localPath)
217266
}
218267

219268
// Symlink creates or updates a symbolic link for the given artifact
@@ -241,12 +290,14 @@ func (s *Storage) Symlink(artifact sourcev1.Artifact, linkName string) (string,
241290
return url, nil
242291
}
243292

244-
// Checksum returns the SHA1 checksum for the given bytes as a string
245-
func (s *Storage) Checksum(b []byte) string {
246-
return fmt.Sprintf("%x", sha1.Sum(b))
293+
// Checksum returns the SHA1 checksum for the data of the given io.Reader as a string.
294+
func (s *Storage) Checksum(reader io.Reader) string {
295+
h := sha1.New()
296+
_, _ = io.Copy(h, reader)
297+
return fmt.Sprintf("%x", h.Sum(nil))
247298
}
248299

249-
// Lock creates a file lock for the given artifact
300+
// Lock creates a file lock for the given v1alpha1.Artifact.
250301
func (s *Storage) Lock(artifact sourcev1.Artifact) (unlock func(), err error) {
251302
lockFile := s.LocalPath(artifact) + ".lock"
252303
mutex := lockedfile.MutexAt(lockFile)
@@ -262,6 +313,8 @@ func (s *Storage) LocalPath(artifact sourcev1.Artifact) string {
262313
return filepath.Join(s.BasePath, artifact.Path)
263314
}
264315

316+
// getPatterns collects ignore patterns from the given reader and returns them
317+
// as a gitignore.Pattern slice.
265318
func getPatterns(reader io.Reader, path []string) []gitignore.Pattern {
266319
var ps []gitignore.Pattern
267320
scanner := bufio.NewScanner(reader)

internal/fs/LICENSE

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
Copyright (c) 2014 The Go Authors. All rights reserved.
2+
3+
Redistribution and use in source and binary forms, with or without
4+
modification, are permitted provided that the following conditions are
5+
met:
6+
7+
* Redistributions of source code must retain the above copyright
8+
notice, this list of conditions and the following disclaimer.
9+
* Redistributions in binary form must reproduce the above
10+
copyright notice, this list of conditions and the following disclaimer
11+
in the documentation and/or other materials provided with the
12+
distribution.
13+
* Neither the name of Google Inc. nor the names of its
14+
contributors may be used to endorse or promote products derived from
15+
this software without specific prior written permission.
16+
17+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18+
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19+
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20+
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21+
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22+
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23+
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24+
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25+
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26+
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

0 commit comments

Comments
 (0)