-
Notifications
You must be signed in to change notification settings - Fork 547
Expand file tree
/
Copy pathfilesystem.go
More file actions
250 lines (210 loc) · 6.9 KB
/
filesystem.go
File metadata and controls
250 lines (210 loc) · 6.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package datastore
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/stellar/go-stellar-sdk/support/log"
)
const (
defaultDirPerms os.FileMode = 0755
defaultFilePerms os.FileMode = 0644
)
var _ DataStore = &FilesystemDataStore{}
// FilesystemDataStore implements DataStore for local filesystem storage.
//
// Note: This implementation is not recommended for production use. It is
// intended for development and testing purposes only.
//
// This implementation does not support storing metadata. The metaData
// parameter in PutFile and PutFileIfNotExists is ignored, and GetFileMetadata
// always returns an empty map.
//
// Concurrent writes to the same file path are not safe and may result in
// data corruption. Callers must ensure proper synchronization when writing
// to the same path from multiple goroutines.
type FilesystemDataStore struct {
basePath string
}
// NewFilesystemDataStore creates a new FilesystemDataStore from configuration.
func NewFilesystemDataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataStore, error) {
destinationPath, ok := datastoreConfig.Params["destination_path"]
if !ok {
return nil, errors.New("invalid Filesystem config, no destination_path")
}
return NewFilesystemDataStoreWithPath(destinationPath)
}
// NewFilesystemDataStoreWithPath creates a FilesystemDataStore with the given base path.
func NewFilesystemDataStoreWithPath(basePath string) (DataStore, error) {
absPath, err := filepath.Abs(basePath)
if err != nil {
return nil, fmt.Errorf("failed to resolve absolute path: %w", err)
}
log.Debugf("Creating Filesystem datastore at: %s", absPath)
return &FilesystemDataStore{
basePath: absPath,
}, nil
}
// fullPath returns the full filesystem path for a given relative path.
func (f *FilesystemDataStore) fullPath(path string) string {
return filepath.Join(f.basePath, path)
}
// Exists checks if a file exists in the filesystem.
func (f *FilesystemDataStore) Exists(ctx context.Context, path string) (bool, error) {
_, err := os.Stat(f.fullPath(path))
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
// Size returns the size of a file in bytes.
func (f *FilesystemDataStore) Size(ctx context.Context, path string) (int64, error) {
info, err := os.Stat(f.fullPath(path))
if err != nil {
if os.IsNotExist(err) {
return 0, os.ErrNotExist
}
return 0, err
}
return info.Size(), nil
}
// GetFileLastModified returns the last modification time of a file.
func (f *FilesystemDataStore) GetFileLastModified(ctx context.Context, path string) (time.Time, error) {
info, err := os.Stat(f.fullPath(path))
if err != nil {
if os.IsNotExist(err) {
return time.Time{}, os.ErrNotExist
}
return time.Time{}, err
}
return info.ModTime(), nil
}
// GetFile returns a reader for the file at the given path.
func (f *FilesystemDataStore) GetFile(ctx context.Context, path string) (io.ReadCloser, error) {
file, err := os.Open(f.fullPath(path))
if err != nil {
if os.IsNotExist(err) {
return nil, os.ErrNotExist
}
return nil, fmt.Errorf("error opening file %s: %w", path, err)
}
log.Debugf("File retrieved successfully: %s", path)
return file, nil
}
// GetFileMetadata returns an empty map as filesystem storage does not support metadata.
func (f *FilesystemDataStore) GetFileMetadata(ctx context.Context, path string) (map[string]string, error) {
if _, err := os.Stat(f.fullPath(path)); os.IsNotExist(err) {
return nil, os.ErrNotExist
}
return map[string]string{}, nil
}
// PutFile writes a file to the filesystem.
func (f *FilesystemDataStore) PutFile(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) error {
fullPath := f.fullPath(path)
// Create parent directories
dir := filepath.Dir(fullPath)
if err := os.MkdirAll(dir, defaultDirPerms); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dir, err)
}
// Write the data file
file, err := os.Create(fullPath)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", path, err)
}
if _, err := in.WriteTo(file); err != nil {
file.Close()
return fmt.Errorf("failed to write file %s: %w", path, err)
}
if err := file.Close(); err != nil {
return fmt.Errorf("failed to close file %s: %w", path, err)
}
log.Debugf("File written successfully: %s", path)
return nil
}
// PutFileIfNotExists writes a file only if it doesn't already exist.
func (f *FilesystemDataStore) PutFileIfNotExists(
ctx context.Context, path string, in io.WriterTo, metaData map[string]string,
) (bool, error) {
fullPath := f.fullPath(path)
// Create parent directories
dir := filepath.Dir(fullPath)
if err := os.MkdirAll(dir, defaultDirPerms); err != nil {
return false, fmt.Errorf("failed to create directory %s: %w", dir, err)
}
// Use O_CREATE|O_EXCL for atomic check-and-create
file, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, defaultFilePerms)
if err != nil {
if os.IsExist(err) {
log.Debugf("File already exists: %s", path)
return false, nil
}
return false, fmt.Errorf("failed to create file %s: %w", path, err)
}
if _, err := in.WriteTo(file); err != nil {
file.Close()
os.Remove(fullPath) // Clean up on error
return false, fmt.Errorf("failed to write file %s: %w", path, err)
}
if err := file.Close(); err != nil {
return false, fmt.Errorf("failed to close file %s: %w", path, err)
}
log.Debugf("File written successfully: %s", path)
return true, nil
}
// ListFilePaths lists file paths matching the given options.
// Results are returned in lexicographical order (matching GCS/S3 behavior).
func (f *FilesystemDataStore) ListFilePaths(ctx context.Context, options ListFileOptions) ([]string, error) {
limit := options.Limit
if limit <= 0 || limit > listFilePathsMaxLimit {
limit = listFilePathsMaxLimit
}
var files []string
err := filepath.WalkDir(f.basePath, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
// Check for context cancellation
if ctx.Err() != nil {
return ctx.Err()
}
// Skip directories
if d.IsDir() {
return nil
}
// Get path relative to basePath and normalize to forward slashes
relPath, err := filepath.Rel(f.basePath, path)
if err != nil {
return err
}
relPath = filepath.ToSlash(relPath)
// Apply prefix filter
if options.Prefix != "" && !strings.HasPrefix(relPath, options.Prefix) {
return nil
}
// Apply StartAfter filter (WalkDir walks in lexical order)
if options.StartAfter != "" && relPath <= options.StartAfter {
return nil
}
files = append(files, relPath)
// Stop early if we've reached the limit
if len(files) >= int(limit) {
return filepath.SkipAll
}
return nil
})
if err != nil && err != filepath.SkipAll {
return nil, err
}
return files, nil
}
// Close is a no-op for FilesystemDataStore as it doesn't maintain persistent connections.
func (f *FilesystemDataStore) Close() error {
return nil
}