Skip to content

Commit 9727f4c

Browse files
lambcode-unifiedclaude
authored andcommitted
fix: use atomic writes for parallel WAL restore spool
Fixes a race condition where MoveOut could read partially-written WAL files during parallel restore, causing "invalid checkpoint record" errors. Downloads now write to .tmp files and atomically rename on completion. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 53b8620 commit 9727f4c

File tree

3 files changed

+161
-2
lines changed

3 files changed

+161
-2
lines changed

pkg/restorer/restorer.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,18 +161,38 @@ func (restorer *WALRestorer) RestoreList(
161161
go func(walIndex int) {
162162
result := &resultList[walIndex]
163163
result.WalName = fetchList[walIndex]
164+
165+
// Determine where to download the file
166+
var downloadPath string
164167
if walIndex == 0 {
165168
// The WAL that PostgreSQL requested will go directly
166-
// to the destination path
169+
// to the destination path (no staging needed)
170+
downloadPath = destinationPath
167171
result.DestinationPath = destinationPath
168172
} else {
173+
// Prefetched WALs go to a temp file first to avoid race conditions
174+
// where MoveOut could read a partially-written file
175+
downloadPath = restorer.spool.TempFileName(result.WalName)
169176
result.DestinationPath = restorer.spool.FileName(result.WalName)
170177
}
171178

172179
result.StartTime = time.Now()
173-
result.Err = restorer.Restore(fetchList[walIndex], result.DestinationPath, options)
180+
result.Err = restorer.Restore(fetchList[walIndex], downloadPath, options)
174181
result.EndTime = time.Now()
175182

183+
// For prefetched WALs, commit the temp file to make it visible,
184+
// or clean up on failure
185+
if walIndex != 0 {
186+
if result.Err == nil {
187+
if commitErr := restorer.spool.Commit(result.WalName); commitErr != nil {
188+
result.Err = commitErr
189+
}
190+
} else {
191+
// Clean up failed temp file
192+
restorer.spool.CleanupTemp(result.WalName)
193+
}
194+
}
195+
176196
elapsedWalTime := result.EndTime.Sub(result.StartTime)
177197
if result.Err == nil {
178198
contextLog.Info(

pkg/spool/spool.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ import (
3131
// on a file which doesn't exist
3232
var ErrorNonExistentFile = fs.ErrNotExist
3333

34+
const (
35+
// tempSuffix is appended to files that are being downloaded.
36+
// Files with this suffix are not yet complete and should not be moved.
37+
tempSuffix = ".tmp"
38+
)
39+
3440
// WALSpool is a way to keep track of which WAL files were processes from the parallel
3541
// feature and not by PostgreSQL request.
3642
// It works using a directory, under which we create an empty file carrying the name
@@ -101,3 +107,32 @@ func (spool *WALSpool) MoveOut(walName, destination string) (err error) {
101107
func (spool *WALSpool) FileName(walName string) string {
102108
return path.Join(spool.spoolDirectory, walName)
103109
}
110+
111+
// TempFileName gets the temporary file path for a WAL being downloaded.
112+
// Files should be written here first, then committed with Commit() to
113+
// ensure atomic visibility to MoveOut and Contains.
114+
func (spool *WALSpool) TempFileName(walName string) string {
115+
return path.Join(spool.spoolDirectory, walName+tempSuffix)
116+
}
117+
118+
// Commit atomically moves a completed download from its temp path to the final path.
119+
// This should be called after a successful download to make the file visible to MoveOut.
120+
// The rename is atomic on POSIX systems when both paths are on the same filesystem.
121+
func (spool *WALSpool) Commit(walName string) error {
122+
tempPath := path.Join(spool.spoolDirectory, walName+tempSuffix)
123+
finalPath := path.Join(spool.spoolDirectory, walName)
124+
125+
if err := os.Rename(tempPath, finalPath); err != nil {
126+
// Clean up the temp file on failure
127+
_ = os.Remove(tempPath)
128+
return fmt.Errorf("failed to commit WAL file %s: %w", walName, err)
129+
}
130+
return nil
131+
}
132+
133+
// CleanupTemp removes a temporary file if it exists.
134+
// This should be called when a download fails to avoid leaving partial files.
135+
func (spool *WALSpool) CleanupTemp(walName string) {
136+
tempPath := path.Join(spool.spoolDirectory, walName+tempSuffix)
137+
_ = os.Remove(tempPath)
138+
}

pkg/spool/spool_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,108 @@ var _ = Describe("Spool", func() {
9393
const walFile = "000000020000068A00000004"
9494
Expect(spool.FileName(walFile)).To(Equal(path.Join(tmpDir, walFile)))
9595
})
96+
97+
It("returns temp file path with .tmp suffix", func() {
98+
const walFile = "000000020000068A00000005"
99+
tempPath := spool.TempFileName(walFile)
100+
Expect(tempPath).To(Equal(path.Join(tmpDir, walFile+".tmp")))
101+
})
102+
103+
It("commits a temp file to its final location", func() {
104+
const walFile = "000000020000068A00000006"
105+
106+
// Create a temp file with some content
107+
tempPath := spool.TempFileName(walFile)
108+
err := os.WriteFile(tempPath, []byte("test content"), 0o600)
109+
Expect(err).ToNot(HaveOccurred())
110+
111+
// Temp file exists, final file does not
112+
Expect(fileutils.FileExists(tempPath)).To(BeTrue())
113+
Expect(spool.Contains(walFile)).To(BeFalse())
114+
115+
// Commit the file
116+
err = spool.Commit(walFile)
117+
Expect(err).ToNot(HaveOccurred())
118+
119+
// Now final file exists, temp file does not
120+
Expect(spool.Contains(walFile)).To(BeTrue())
121+
tempExists, _ := fileutils.FileExists(tempPath)
122+
Expect(tempExists).To(BeFalse())
123+
124+
// Verify content was preserved
125+
content, err := os.ReadFile(spool.FileName(walFile))
126+
Expect(err).ToNot(HaveOccurred())
127+
Expect(string(content)).To(Equal("test content"))
128+
})
129+
130+
It("returns error when committing non-existent temp file", func() {
131+
const walFile = "000000020000068A00000007"
132+
133+
err := spool.Commit(walFile)
134+
Expect(err).To(HaveOccurred())
135+
Expect(err.Error()).To(ContainSubstring("failed to commit WAL file"))
136+
})
137+
138+
It("cleans up temp files", func() {
139+
const walFile = "000000020000068A00000008"
140+
141+
// Create a temp file
142+
tempPath := spool.TempFileName(walFile)
143+
err := os.WriteFile(tempPath, []byte("test content"), 0o600)
144+
Expect(err).ToNot(HaveOccurred())
145+
Expect(fileutils.FileExists(tempPath)).To(BeTrue())
146+
147+
// Clean it up
148+
spool.CleanupTemp(walFile)
149+
150+
// Temp file should be gone
151+
tempExists, _ := fileutils.FileExists(tempPath)
152+
Expect(tempExists).To(BeFalse())
153+
})
154+
155+
It("cleanup is safe on non-existent temp file", func() {
156+
const walFile = "000000020000068A00000009"
157+
158+
// This should not panic or error
159+
spool.CleanupTemp(walFile)
160+
})
161+
162+
// These two tests verify the race condition fix:
163+
// temp files (.tmp) must be invisible to Contains and MoveOut
164+
165+
It("Contains does NOT see temp files", func() {
166+
const walFile = "000000020000068A0000000A"
167+
168+
// Create a temp file (simulating an in-progress download)
169+
tempPath := spool.TempFileName(walFile)
170+
err := os.WriteFile(tempPath, []byte("partial content"), 0o600)
171+
Expect(err).ToNot(HaveOccurred())
172+
173+
// Contains should return false - temp files are invisible
174+
Expect(spool.Contains(walFile)).To(BeFalse())
175+
176+
// Clean up
177+
spool.CleanupTemp(walFile)
178+
})
179+
180+
It("MoveOut does NOT see temp files", func() {
181+
const walFile = "000000020000068A0000000B"
182+
183+
// Create a temp file (simulating an in-progress download)
184+
tempPath := spool.TempFileName(walFile)
185+
err := os.WriteFile(tempPath, []byte("partial content"), 0o600)
186+
Expect(err).ToNot(HaveOccurred())
187+
188+
// MoveOut should fail with ErrorNonExistentFile - temp files are invisible
189+
destinationPath := path.Join(tmpDir2, "testFile")
190+
err = spool.MoveOut(walFile, destinationPath)
191+
Expect(err).To(Equal(ErrorNonExistentFile))
192+
193+
// Destination should not exist
194+
destExists, _ := fileutils.FileExists(destinationPath)
195+
Expect(destExists).To(BeFalse())
196+
197+
// Clean up
198+
spool.CleanupTemp(walFile)
199+
})
96200
})

0 commit comments

Comments
 (0)