forked from cloudnative-pg/barman-cloud
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspool.go
More file actions
138 lines (116 loc) · 4.59 KB
/
spool.go
File metadata and controls
138 lines (116 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
Copyright The CloudNativePG Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spool
import (
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"github.com/cloudnative-pg/machinery/pkg/fileutils"
"github.com/cloudnative-pg/machinery/pkg/log"
)
// ErrorNonExistentFile is returned when the spool tried to work
// on a file which doesn't exist
var ErrorNonExistentFile = fs.ErrNotExist
const (
// tempSuffix is appended to files that are being downloaded.
// Files with this suffix are not yet complete and should not be moved.
tempSuffix = ".tmp"
)
// WALSpool is a way to keep track of which WAL files were processes from the parallel
// feature and not by PostgreSQL request.
// It works using a directory, under which we create an empty file carrying the name
// of the WAL we archived
type WALSpool struct {
spoolDirectory string
}
// New create new WAL spool
func New(spoolDirectory string) (*WALSpool, error) {
if err := fileutils.EnsureDirectoryExists(spoolDirectory); err != nil {
log.Warning("Cannot create the spool directory", "spoolDirectory", spoolDirectory)
return nil, fmt.Errorf("while creating spool directory: %w", err)
}
return &WALSpool{
spoolDirectory: spoolDirectory,
}, nil
}
// Contains checks if a certain file is in the spool or not
func (spool *WALSpool) Contains(walFile string) (bool, error) {
walFile = path.Base(walFile)
return fileutils.FileExists(path.Join(spool.spoolDirectory, walFile))
}
// Remove removes a WAL file from the spool. If the WAL file doesn't
// exist an error is returned
func (spool *WALSpool) Remove(walFile string) error {
walFile = path.Base(walFile)
err := os.Remove(path.Join(spool.spoolDirectory, walFile))
if err != nil && os.IsNotExist(err) {
return ErrorNonExistentFile
}
return err
}
// Touch ensure that a certain WAL file is included into the spool as an empty file
func (spool *WALSpool) Touch(walFile string) (err error) {
var f *os.File
walFile = path.Base(walFile)
fileName := path.Join(spool.spoolDirectory, walFile)
if f, err = os.Create(filepath.Clean(fileName)); err != nil {
return err
}
if err = f.Close(); err != nil {
log.Warning("Cannot close empty file, error skipped", "fileName", fileName, "err", err)
}
return nil
}
// MoveOut moves out a file from the spool to the destination file
func (spool *WALSpool) MoveOut(walName, destination string) (err error) {
// We cannot use os.Rename here, as it will not work between different
// volumes, such as moving files from an EmptyDir volume to the data
// directory.
// Given that, we rely on the old strategy to copy stuff around.
err = fileutils.MoveFile(path.Join(spool.spoolDirectory, walName), destination)
if err != nil && os.IsNotExist(err) {
return ErrorNonExistentFile
}
return err
}
// FileName gets the name of the file for the given WAL inside the spool
func (spool *WALSpool) FileName(walName string) string {
return path.Join(spool.spoolDirectory, walName)
}
// TempFileName gets the temporary file path for a WAL being downloaded.
// Files should be written here first, then committed with Commit() to
// ensure atomic visibility to MoveOut and Contains.
func (spool *WALSpool) TempFileName(walName string) string {
return path.Join(spool.spoolDirectory, walName+tempSuffix)
}
// Commit atomically moves a completed download from its temp path to the final path.
// This should be called after a successful download to make the file visible to MoveOut.
// The rename is atomic on POSIX systems when both paths are on the same filesystem.
func (spool *WALSpool) Commit(walName string) error {
tempPath := path.Join(spool.spoolDirectory, walName+tempSuffix)
finalPath := path.Join(spool.spoolDirectory, walName)
if err := os.Rename(tempPath, finalPath); err != nil {
// Clean up the temp file on failure
_ = os.Remove(tempPath)
return fmt.Errorf("failed to commit WAL file %s: %w", walName, err)
}
return nil
}
// CleanupTemp removes a temporary file if it exists.
// This should be called when a download fails to avoid leaving partial files.
func (spool *WALSpool) CleanupTemp(walName string) {
tempPath := path.Join(spool.spoolDirectory, walName+tempSuffix)
_ = os.Remove(tempPath)
}