@@ -12,6 +12,7 @@ import (
12
12
"fmt"
13
13
"reflect"
14
14
"strconv"
15
+ "strings"
15
16
"sync/atomic"
16
17
"testing"
17
18
@@ -27,6 +28,8 @@ import (
27
28
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
28
29
"github.com/cockroachdb/errors"
29
30
"github.com/cockroachdb/pebble"
31
+ "github.com/cockroachdb/pebble/vfs"
32
+ "github.com/cockroachdb/pebble/vfs/errorfs"
30
33
"github.com/stretchr/testify/assert"
31
34
"github.com/stretchr/testify/require"
32
35
)
@@ -1010,3 +1013,61 @@ func TestBatchReader(t *testing.T) {
1010
1013
require .False (t , r .Next ())
1011
1014
require .NoError (t , r .Error ())
1012
1015
}
1016
+
1017
+ // TestBatchCommitDoesntTouchSST tests that committing a writeBatch doesn't
1018
+ // touch SST files.
1019
+ func TestBatchCommitDoesntTouchSST (t * testing.T ) {
1020
+ defer leaktest .AfterTest (t )()
1021
+ defer log .Scope (t ).Close (t )
1022
+
1023
+ // Create an atomic variable that will cause an error when SST operations are
1024
+ // performed.
1025
+ var failSSTOps atomic.Bool
1026
+
1027
+ // Create a custom injector that blocks SST operations when failSSTOps is
1028
+ // true.
1029
+ injector := errorfs .InjectorFunc (func (op errorfs.Op ) error {
1030
+ if strings .Contains (op .Path , ".sst" ) && failSSTOps .Load () {
1031
+ return errors .Newf ("blocking SST operation: %+v" , op )
1032
+ }
1033
+ return nil
1034
+ })
1035
+
1036
+ // Create the wrapped filesystem, and create a db.
1037
+ memFS := vfs .NewMem ()
1038
+ wrappedFS := errorfs .Wrap (memFS , injector )
1039
+ env := mustInitTestEnv (t , wrappedFS , "" )
1040
+ db , err := Open (context .Background (), env , cluster .MakeClusterSettings ())
1041
+ require .NoError (t , err )
1042
+ defer db .Close ()
1043
+
1044
+ // Initialize the db with some data.
1045
+ initBatch := db .NewBatch ()
1046
+ defer initBatch .Close ()
1047
+
1048
+ // Perform some operations.
1049
+ require .NoError (t , initBatch .PutUnversioned (mvccKey ("key1" ).Key , []byte ("val1" )))
1050
+ require .NoError (t , initBatch .PutUnversioned (mvccKey ("key2" ).Key , []byte ("val2" )))
1051
+ require .NoError (t , initBatch .PutUnversioned (mvccKey ("key3" ).Key , []byte ("val3" )))
1052
+ require .NoError (t , initBatch .PutUnversioned (mvccKey ("key4" ).Key , []byte ("val4" )))
1053
+ require .NoError (t , initBatch .Commit (true /* sync */ ))
1054
+
1055
+ // Force a flush to create an SST file.
1056
+ require .NoError (t , db .Flush ())
1057
+
1058
+ // Create a new batch for testing.
1059
+ testingBatch := db .NewBatch ()
1060
+ defer testingBatch .Close ()
1061
+
1062
+ // Perform some operations.
1063
+ require .Equal (t , []byte ("val1" ), mvccGetRaw (t , testingBatch , mvccKey ("key1" )))
1064
+ require .Equal (t , []byte (nil ), mvccGetRaw (t , testingBatch , mvccKey ("non-existent-key" )))
1065
+ _ , err = Scan (context .Background (), testingBatch , localMax , roachpb .KeyMax , 0 )
1066
+ require .NoError (t , err )
1067
+ require .NoError (t , testingBatch .ClearUnversioned (mvccKey ("key4" ).Key , ClearOptions {}))
1068
+
1069
+ // Before committing, enable SST operation errors and make sure the commit
1070
+ // succeeds.
1071
+ failSSTOps .Store (true )
1072
+ require .NoError (t , testingBatch .Commit (true /* sync */ ))
1073
+ }
0 commit comments