Skip to content

Commit 9f1386d

Browse files
committed
logstore: downgrade Engine to fs.Env
Epic: none Release note: none
1 parent 3b45e6a commit 9f1386d

File tree

4 files changed

+17
-22
lines changed

4 files changed

+17
-22
lines changed

pkg/kv/kvserver/logstore/sideload_disk.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
1919
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
20-
"github.com/cockroachdb/cockroach/pkg/storage"
2120
"github.com/cockroachdb/cockroach/pkg/storage/fs"
2221
"github.com/cockroachdb/cockroach/pkg/util/log"
2322
"github.com/cockroachdb/errors"
@@ -36,7 +35,7 @@ type DiskSideloadStorage struct {
3635
st *cluster.Settings
3736
limiter *rate.Limiter
3837
dir string
39-
eng storage.Engine
38+
fs *fs.Env
4039
}
4140

4241
func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
@@ -57,15 +56,11 @@ func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
5756
// NewDiskSideloadStorage creates a SideloadStorage for a given replica, stored
5857
// in the specified engine.
5958
func NewDiskSideloadStorage(
60-
st *cluster.Settings,
61-
rangeID roachpb.RangeID,
62-
baseDir string,
63-
limiter *rate.Limiter,
64-
eng storage.Engine,
59+
st *cluster.Settings, rangeID roachpb.RangeID, baseDir string, limiter *rate.Limiter, fs *fs.Env,
6560
) *DiskSideloadStorage {
6661
return &DiskSideloadStorage{
6762
dir: sideloadedPath(baseDir, rangeID),
68-
eng: eng,
63+
fs: fs,
6964
st: st,
7065
limiter: limiter,
7166
}
@@ -86,14 +81,14 @@ func (ss *DiskSideloadStorage) Put(
8681
for {
8782
// Use 0644 since that's what RocksDB uses:
8883
// https://github.com/facebook/rocksdb/blob/56656e12d67d8a63f1e4c4214da9feeec2bd442b/env/env_posix.cc#L171
89-
if err := kvserverbase.WriteFileSyncing(ctx, filename, contents, ss.eng.Env(), 0644, ss.st, ss.limiter, fs.PebbleIngestionWriteCategory); err == nil {
84+
if err := kvserverbase.WriteFileSyncing(ctx, filename, contents, ss.fs, 0644, ss.st, ss.limiter, fs.PebbleIngestionWriteCategory); err == nil {
9085
return nil
9186
} else if !oserror.IsNotExist(err) {
9287
return err
9388
}
9489
// Ensure that ss.dir exists. The filename() is placed directly in ss.dir,
9590
// so the next loop iteration should succeed.
96-
if err := mkdirAllAndSyncParents(ss.eng.Env(), ss.dir, os.ModePerm); err != nil {
91+
if err := mkdirAllAndSyncParents(ss.fs, ss.dir, os.ModePerm); err != nil {
9792
return err
9893
}
9994
continue
@@ -102,7 +97,7 @@ func (ss *DiskSideloadStorage) Put(
10297

10398
// Sync implements SideloadStorage.
10499
func (ss *DiskSideloadStorage) Sync() error {
105-
dir, err := ss.eng.Env().OpenDir(ss.dir)
100+
dir, err := ss.fs.OpenDir(ss.dir)
106101
// The directory can be missing because we did not Put() any entry to it yet,
107102
// or it has been removed by TruncateTo() or Clear().
108103
//
@@ -127,7 +122,7 @@ func (ss *DiskSideloadStorage) Get(
127122
ctx context.Context, index kvpb.RaftIndex, term kvpb.RaftTerm,
128123
) ([]byte, error) {
129124
filename := ss.filename(ctx, index, term)
130-
b, err := fs.ReadFile(ss.eng.Env(), filename)
125+
b, err := fs.ReadFile(ss.fs, filename)
131126
if oserror.IsNotExist(err) {
132127
return nil, errSideloadedFileNotFound
133128
}
@@ -155,7 +150,7 @@ func (ss *DiskSideloadStorage) Purge(
155150
}
156151

157152
func (ss *DiskSideloadStorage) fileSize(filename string) (int64, error) {
158-
info, err := ss.eng.Env().Stat(filename)
153+
info, err := ss.fs.Stat(filename)
159154
if err != nil {
160155
if oserror.IsNotExist(err) {
161156
return 0, errSideloadedFileNotFound
@@ -170,7 +165,7 @@ func (ss *DiskSideloadStorage) purgeFile(ctx context.Context, filename string) (
170165
if err != nil {
171166
return 0, err
172167
}
173-
if err := ss.eng.Env().Remove(filename); err != nil {
168+
if err := ss.fs.Remove(filename); err != nil {
174169
if oserror.IsNotExist(err) {
175170
return 0, errSideloadedFileNotFound
176171
}
@@ -181,7 +176,7 @@ func (ss *DiskSideloadStorage) purgeFile(ctx context.Context, filename string) (
181176

182177
// Clear implements SideloadStorage.
183178
func (ss *DiskSideloadStorage) Clear(_ context.Context) error {
184-
return ss.eng.Env().RemoveAll(ss.dir)
179+
return ss.fs.RemoveAll(ss.dir)
185180
}
186181

187182
// TruncateTo implements SideloadStorage.
@@ -207,7 +202,7 @@ func (ss *DiskSideloadStorage) TruncateTo(ctx context.Context, lastIndex kvpb.Ra
207202
if deletedAll {
208203
// The directory may not exist, or it may exist and have been empty.
209204
// Not worth trying to figure out which one, just try to delete.
210-
err := ss.eng.Env().Remove(ss.dir)
205+
err := ss.fs.Remove(ss.dir)
211206
if err != nil && !oserror.IsNotExist(err) {
212207
// TODO(pavelkalinnikov): this is possible because deletedAll can be left
213208
// true despite existence of files with index < from which are skipped.
@@ -246,7 +241,7 @@ func (ss *DiskSideloadStorage) forEach(
246241
ctx context.Context, visit func(index kvpb.RaftIndex, filename string) (bool, error),
247242
) error {
248243
// TODO(pavelkalinnikov): consider making the List method iterative.
249-
matches, err := ss.eng.Env().List(ss.dir)
244+
matches, err := ss.fs.List(ss.dir)
250245
if oserror.IsNotExist(err) {
251246
return nil // nothing to do
252247
} else if err != nil {

pkg/kv/kvserver/logstore/sideload_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func newTestingSideloadStorage(eng storage.Engine) *DiskSideloadStorage {
8686
return NewDiskSideloadStorage(
8787
cluster.MakeTestingClusterSettings(), 1,
8888
filepath.Join(eng.GetAuxiliaryDir(), "fake", "testing", "dir"),
89-
rate.NewLimiter(rate.Inf, math.MaxInt64), eng)
89+
rate.NewLimiter(rate.Inf, math.MaxInt64), eng.Env())
9090
}
9191

9292
// TODO(pavelkalinnikov): give these tests a good refactor.
@@ -96,7 +96,7 @@ func testSideloadingSideloadedStorage(t *testing.T, eng storage.Engine) {
9696

9797
assertExists := func(exists bool) {
9898
t.Helper()
99-
_, err := ss.eng.Env().Stat(ss.dir)
99+
_, err := ss.fs.Stat(ss.dir)
100100
if !exists {
101101
require.True(t, oserror.IsNotExist(err), err)
102102
} else {
@@ -537,7 +537,7 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
537537
if test.size != stats.SideloadedBytes {
538538
t.Fatalf("expected %d sideloadedSize, but found %d", test.size, stats.SideloadedBytes)
539539
}
540-
actKeys, err := sideloaded.eng.Env().List(sideloaded.Dir())
540+
actKeys, err := sideloaded.fs.List(sideloaded.Dir())
541541
if oserror.IsNotExist(err) {
542542
t.Log("swallowing IsNotExist")
543543
err = nil

pkg/kv/kvserver/replica_init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func newUninitializedReplicaWithoutRaftGroup(store *Store, id roachpb.FullReplic
218218
// can be ingested to the state machine locally, when being applied.
219219
store.StateEngine().GetAuxiliaryDir(),
220220
store.limiters.BulkIOWriteRate,
221-
store.StateEngine(),
221+
store.StateEngine().Env(),
222222
)
223223
r.logStorage = &replicaLogStorage{
224224
ctx: r.raftCtx,

pkg/kv/kvserver/replica_raftlog_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func newReplicaLogStorageTest(t *testing.T) *replicaLogStorageTest {
102102
st := cluster.MakeTestingClusterSettings()
103103
eng := storage.NewDefaultInMemForTesting()
104104
sideloaded := logstore.NewDiskSideloadStorage(st, rangeID,
105-
eng.GetAuxiliaryDir(), nil /* limiter: unused */, eng)
105+
eng.GetAuxiliaryDir(), nil /* limiter: unused */, eng.Env())
106106

107107
rt.ls = &replicaLogStorage{
108108
ctx: context.Background(),

0 commit comments

Comments
 (0)