Skip to content

Commit b3688c4

Browse files
craig[bot]iskettaneh
andcommitted
Merge #144053
144053: kvserver: create a new PebbleCorruptionError r=iskettaneh a=iskettaneh This commit introduces a new KV error called PebbleCorruptionError. This error is set when pebble returns DataCorruptionError, indicating that there is a data corruption (a file that an SSTable is pointing to got deleted for example). References: #143135 Release note: None Co-authored-by: Ibrahim Kettaneh <[email protected]>
2 parents b3fa0b4 + ce59acb commit b3688c4

File tree

5 files changed

+86
-18
lines changed

5 files changed

+86
-18
lines changed

pkg/kv/kvpb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ go_library(
4545
"//pkg/util/uuid",
4646
"@com_github_cockroachdb_errors//:errors",
4747
"@com_github_cockroachdb_errors//extgrpc",
48+
"@com_github_cockroachdb_pebble//:pebble",
4849
"@com_github_cockroachdb_redact//:redact",
4950
"@com_github_dustin_go_humanize//:go-humanize",
5051
"@com_github_gogo_protobuf//proto",

pkg/kv/kvpb/errors.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2323
"github.com/cockroachdb/errors"
2424
_ "github.com/cockroachdb/errors/extgrpc" // register EncodeError support for gRPC Status
25+
"github.com/cockroachdb/pebble"
2526
"github.com/cockroachdb/redact"
2627
"github.com/gogo/protobuf/proto"
2728
)
@@ -347,6 +348,7 @@ func init() {
347348
errors.RegisterTypeMigration(roachpbPath, "*roachpb.RefreshFailedError", &RefreshFailedError{})
348349
errors.RegisterTypeMigration(roachpbPath, "*roachpb.MVCCHistoryMutationError", &MVCCHistoryMutationError{})
349350
errors.RegisterTypeMigration(roachpbPath, "*roachpb.InsufficientSpaceError", &InsufficientSpaceError{})
351+
errors.RegisterTypeMigration(roachpbPath, "*roachpb.PebbleCorruptionError", &PebbleCorruptionError{})
350352
}
351353

352354
// GoError returns a Go error converted from Error. If the error is a transaction
@@ -1624,6 +1626,35 @@ func (e *InsufficientSpaceError) SafeFormatError(p errors.Printer) (next error)
16241626
return nil
16251627
}
16261628

1629+
// NewPebbleCorruptionError creates a new PebbleCorruptionError.
1630+
func NewPebbleCorruptionError(
1631+
storeID roachpb.StoreID, info *pebble.DataCorruptionInfo,
1632+
) *PebbleCorruptionError {
1633+
err := &PebbleCorruptionError{
1634+
StoreID: storeID,
1635+
Path: info.Path,
1636+
IsRemote: info.IsRemote,
1637+
ExtraMsg: info.Details.Error(),
1638+
}
1639+
return err
1640+
}
1641+
1642+
func (e *PebbleCorruptionError) Error() string {
1643+
return fmt.Sprint(e)
1644+
}
1645+
1646+
// Format implements fmt.Formatter.
1647+
func (e *PebbleCorruptionError) Format(s fmt.State, verb rune) {
1648+
errors.FormatError(e, s, verb)
1649+
}
1650+
1651+
// SafeFormatError implements errors.SafeFormatter.
1652+
func (e *PebbleCorruptionError) SafeFormatError(p errors.Printer) (next error) {
1653+
p.Printf("pebble corruption error on store id:%d, path:%s, remote:%t, extra message: %s",
1654+
e.StoreID, e.Path, e.IsRemote, e.ExtraMsg)
1655+
return nil
1656+
}
1657+
16271658
// NewNotLeaseHolderError returns a NotLeaseHolderError initialized with the
16281659
// replica for the holder (if any) of the given lease.
16291660
//

pkg/kv/kvpb/errors.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,3 +791,16 @@ message InsufficientSpaceError {
791791
// RequiredFraction is the required remaining capacity fraction.
792792
optional double required = 5 [(gogoproto.nullable) = false];
793793
}
794+
795+
796+
// PebbleCorruptionError indicates that pebble thinks that there is a data
797+
// corruption.
798+
message PebbleCorruptionError {
799+
optional int64 store_id = 1 [(gogoproto.nullable) = false,
800+
(gogoproto.customname) = "StoreID",
801+
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"];
802+
optional string path = 2 [(gogoproto.nullable) = false];
803+
optional bool is_remote = 3 [(gogoproto.nullable) = false];
804+
optional string extra_msg = 4 [(gogoproto.nullable) = false];
805+
}
806+

pkg/kv/kvserver/deleted_external_sstable_test.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,19 @@ import (
3333
// externalSSTTestCluster is a helper struct that has helper functions to set up
3434
// and run the tests.
3535
type externalSSTTestCluster struct {
36-
tc *testcluster.TestCluster
37-
db *kv.DB
36+
tc *testcluster.TestCluster
37+
db *kv.DB
38+
externURI string
39+
sstFile string
40+
}
41+
42+
// newExternalSSTTestCluster creates a new externalSSTTestCluster, and it also
43+
// initializes the sstFile and the externalURI to be used in the tests.
44+
func newExternalSSTTestCluster() *externalSSTTestCluster {
45+
return &externalSSTTestCluster{
46+
externURI: "nodelocal://1/external-files",
47+
sstFile: "file1.sst",
48+
}
3849
}
3950

4051
// testSetup creates a test cluster with ranges split in this way:
@@ -321,8 +332,9 @@ func (etc *externalSSTTestCluster) mergeHelper(ctx context.Context, key roachpb.
321332

322333
// adminSplitArgs issues an AdminSplitRequest for the provided key.
323334
func (etc *externalSSTTestCluster) splitHelper(ctx context.Context, key roachpb.Key) error {
324-
_, _, err := etc.tc.SplitRange(key)
325-
return err
335+
b := kv.Batch{}
336+
b.AddRawRequest(adminSplitArgs(key))
337+
return etc.db.Run(ctx, &b)
326338
}
327339

328340
// exciseHelper excises the provided key range from the store.
@@ -365,7 +377,10 @@ func (etc *externalSSTTestCluster) checkKeysPointToSameRangeDesc(
365377
// due to the file not being found.
366378
func (etc *externalSSTTestCluster) requireNotFoundError(t *testing.T, err error) {
367379
t.Helper()
368-
require.Regexp(t, "no such file or directory", err)
380+
pce := &kvpb.PebbleCorruptionError{}
381+
require.ErrorAs(t, err, &pce)
382+
require.True(t, pce.IsRemote)
383+
require.Equal(t, fmt.Sprintf("remote-%s://%s", etc.externURI, etc.sstFile), pce.Path)
369384
}
370385

371386
// TestGeneralOperationsWorkAsExpectedOnDeletedExternalSST tests that general
@@ -404,7 +419,6 @@ func TestGeneralOperationsWorkAsExpectedOnDeletedExternalSST(t *testing.T) {
404419
defer leaktest.AfterTest(t)()
405420
defer log.Scope(t).Close(t)
406421
defer nodelocal.ReplaceNodeLocalForTesting(t.TempDir())()
407-
const externURI = "nodelocal://1/external-files"
408422

409423
skip.UnderRace(t) // too slow under stressrace
410424

@@ -833,7 +847,7 @@ func TestGeneralOperationsWorkAsExpectedOnDeletedExternalSST(t *testing.T) {
833847
for _, testCase := range testCases {
834848
t.Run(testCase.name, func(t *testing.T) {
835849
ctx := context.Background()
836-
etc := externalSSTTestCluster{}
850+
etc := newExternalSSTTestCluster()
837851
etc.testSetup(t)
838852
defer etc.tc.Stopper().Stop(ctx)
839853

@@ -842,21 +856,21 @@ func TestGeneralOperationsWorkAsExpectedOnDeletedExternalSST(t *testing.T) {
842856
stream, _ := etc.createRangeFeed(t, roachpb.Key("d"), roachpb.Key("g"))
843857
defer stream.Cancel()
844858

845-
externalStorage, err := etc.createExternalStorage(ctx, externURI)
859+
externalStorage, err := etc.createExternalStorage(ctx, etc.externURI)
846860
require.NoError(t, err)
847861

848862
firstStartChar := testCase.deletedExternalSpanStart[0]
849863
firstEndChar := testCase.deletedExternalSpanEnd[0]
850864
require.NoError(t, etc.createExternalSSTableFile(t, ctx, externalStorage,
851-
"file1.sst", firstStartChar, firstEndChar))
865+
etc.sstFile, firstStartChar, firstEndChar))
852866
require.NoError(t, etc.linkExternalSSTableToFile(ctx, testCase.deletedExternalSpanStart,
853-
testCase.deletedExternalSpanEnd, externURI, "file1.sst"))
867+
testCase.deletedExternalSpanEnd, etc.externURI, etc.sstFile))
854868

855869
// Before deleting the file, run some data operations that will be
856870
// on top of the SSTable pointing to the soon-to-be deleted file.
857871
pendingTxn1, pendingTxn2, err := etc.writeIntents(ctx, etc.db)
858872
require.NoError(t, err)
859-
require.NoError(t, externalStorage.Delete(ctx, "file1.sst"))
873+
require.NoError(t, externalStorage.Delete(ctx, etc.sstFile))
860874

861875
// We should be able to commit the transactions since they just have
862876
// point writes, and they wouldn't need the deleted file.
@@ -865,7 +879,7 @@ func TestGeneralOperationsWorkAsExpectedOnDeletedExternalSST(t *testing.T) {
865879

866880
// Run the test function, and make sure that the store is consistent
867881
// afterward.
868-
testCase.testFunc(t, ctx, &etc)
882+
testCase.testFunc(t, ctx, etc)
869883
require.NoError(t, etc.checkConsistency(ctx, roachpb.Key("a"), roachpb.Key("z")))
870884
})
871885
}
@@ -895,16 +909,15 @@ func TestRangeFeedWithExcise(t *testing.T) {
895909
defer leaktest.AfterTest(t)()
896910
defer log.Scope(t).Close(t)
897911
defer nodelocal.ReplaceNodeLocalForTesting(t.TempDir())()
898-
const externURI = "nodelocal://1/external-files"
899912

900913
ctx := context.Background()
901-
etc := externalSSTTestCluster{}
914+
etc := newExternalSSTTestCluster()
902915

903916
// Create a test cluster with the following ranges: [a, d), [d, g), [g, z).
904917
etc.testSetup(t)
905918
defer etc.tc.Stopper().Stop(ctx)
906919

907-
externalStorage, err := etc.createExternalStorage(ctx, externURI)
920+
externalStorage, err := etc.createExternalStorage(ctx, etc.externURI)
908921
require.NoError(t, err)
909922

910923
deletedStartKey := roachpb.Key("d")
@@ -913,9 +926,9 @@ func TestRangeFeedWithExcise(t *testing.T) {
913926
firstEndChar := deletedEndKey[0]
914927

915928
require.NoError(t, etc.createExternalSSTableFile(t, ctx, externalStorage,
916-
"file1.sst", firstStartChar, firstEndChar))
929+
etc.sstFile, firstStartChar, firstEndChar))
917930
require.NoError(t, etc.linkExternalSSTableToFile(ctx, deletedStartKey,
918-
deletedEndKey, externURI, "file1.sst"))
931+
deletedEndKey, etc.externURI, etc.sstFile))
919932

920933
// Create one range feed per range.
921934
rfStream1, rfErr1 := etc.createRangeFeed(t, roachpb.Key("a"), roachpb.Key("d"))
@@ -954,7 +967,7 @@ func TestRangeFeedWithExcise(t *testing.T) {
954967
pendingTxn1, pendingTxn2, err := etc.writeIntents(ctx, etc.db)
955968
require.NoError(t, err)
956969

957-
require.NoError(t, externalStorage.Delete(ctx, "file1.sst"))
970+
require.NoError(t, externalStorage.Delete(ctx, etc.sstFile))
958971

959972
require.NoError(t, pendingTxn1.Commit(ctx))
960973
require.NoError(t, pendingTxn2.Commit(ctx))

pkg/kv/kvserver/replica_evaluate.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/cockroachdb/cockroach/pkg/util/tracing"
3131
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
3232
"github.com/cockroachdb/errors"
33+
"github.com/cockroachdb/pebble"
3334
"github.com/kr/pretty"
3435
)
3536

@@ -562,6 +563,15 @@ func evaluateCommand(
562563
log.VEventf(ctx, 2, "evaluated %s command %s, txn=%v : resp=%s, err=%v",
563564
args.Method(), trunc(args.String()), h.Txn, resp, err)
564565
}
566+
567+
// If there is a pebble data corruption error, we want to serialize it by
568+
// returning the KV error PebbleCorruptionError. This way, the error can be
569+
// extracted by KV clients.
570+
if err != nil {
571+
if info := pebble.ExtractDataCorruptionInfo(err); info != nil {
572+
err = kvpb.NewPebbleCorruptionError(rec.StoreID(), info)
573+
}
574+
}
565575
return pd, err
566576
}
567577

0 commit comments

Comments
 (0)