diff --git a/checkpoint.go b/checkpoint.go index 9c616f08e3..49c6ab2338 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -494,7 +494,7 @@ func (d *DB) writeCheckpointManifest( // append a record after a raw data copy; see // https://github.com/cockroachdb/cockroach/issues/100935). r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum) - w := record.NewWriter(dst) + w := record.NewWriter(dst, manifestFileNum, d.FormatMajorVersion() >= FormatManifestSyncChunks) for { rr, err := r.Next() if err != nil { diff --git a/format_major_version.go b/format_major_version.go index 062789a411..ce26403946 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -226,6 +226,10 @@ const ( // once stable. FormatExperimentalValueSeparation + // FormatManifestSyncChunks is a format major version enabling the writing of + // sync offset chunks for Manifest files. See comment for FormatWALSyncChunks. + FormatManifestSyncChunks + // -- Add new versions here -- // FormatNewest is the most recent format major version. @@ -266,7 +270,7 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { return sstable.TableFormatPebblev4 case FormatColumnarBlocks, FormatWALSyncChunks: return sstable.TableFormatPebblev5 - case FormatTableFormatV6, FormatExperimentalValueSeparation: + case FormatTableFormatV6, FormatExperimentalValueSeparation, FormatManifestSyncChunks: return sstable.TableFormatPebblev6 default: panic(fmt.Sprintf("pebble: unsupported format major version: %s", v)) @@ -280,7 +284,7 @@ func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat { case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted, FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix, FormatFlushableIngestExcises, FormatColumnarBlocks, FormatWALSyncChunks, - FormatTableFormatV6, FormatExperimentalValueSeparation: + FormatTableFormatV6, FormatExperimentalValueSeparation, FormatManifestSyncChunks: return sstable.TableFormatPebblev1 default: panic(fmt.Sprintf("pebble: unsupported format major version: %s", v)) @@ -332,6 +336,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatExperimentalValueSeparation: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatExperimentalValueSeparation) }, + FormatManifestSyncChunks: func(d *DB) error { + return d.finalizeFormatVersUpgrade(FormatManifestSyncChunks) + }, } const formatVersionMarkerName = `format-version` diff --git a/format_major_version_test.go b/format_major_version_test.go index f7461aaff4..149b12138d 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -29,11 +29,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) { require.Equal(t, FormatWALSyncChunks, FormatMajorVersion(20)) require.Equal(t, FormatTableFormatV6, FormatMajorVersion(21)) require.Equal(t, FormatExperimentalValueSeparation, FormatMajorVersion(22)) + require.Equal(t, FormatManifestSyncChunks, FormatMajorVersion(23)) // When we add a new version, we should add a check for the new version in // addition to updating these expected values. require.Equal(t, FormatNewest, FormatMajorVersion(21)) - require.Equal(t, internalFormatNewest, FormatMajorVersion(22)) + require.Equal(t, internalFormatNewest, FormatMajorVersion(23)) } func TestFormatMajorVersion_MigrationDefined(t *testing.T) { @@ -70,6 +71,8 @@ func TestRatchetFormat(t *testing.T) { require.Equal(t, FormatTableFormatV6, d.FormatMajorVersion()) require.NoError(t, d.RatchetFormatMajorVersion(FormatExperimentalValueSeparation)) require.Equal(t, FormatExperimentalValueSeparation, d.FormatMajorVersion()) + require.NoError(t, d.RatchetFormatMajorVersion(FormatManifestSyncChunks)) + require.Equal(t, FormatManifestSyncChunks, d.FormatMajorVersion()) require.NoError(t, d.Close()) @@ -230,6 +233,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { FormatWALSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev5}, FormatTableFormatV6: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6}, FormatExperimentalValueSeparation: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6}, + FormatManifestSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6}, } // Valid versions. diff --git a/objstorage/objstorageprovider/remoteobjcat/catalog.go b/objstorage/objstorageprovider/remoteobjcat/catalog.go index b975dd4572..0aab21c71e 100644 --- a/objstorage/objstorageprovider/remoteobjcat/catalog.go +++ b/objstorage/objstorageprovider/remoteobjcat/catalog.go @@ -339,7 +339,7 @@ func (c *Catalog) createNewCatalogFileLocked() (outErr error) { if err != nil { return err } - recWriter := record.NewWriter(file) + recWriter := record.NewWriter(file, 0, false) err = func() error { // Create a VersionEdit that gets us from an empty catalog to the current state. var ve VersionEdit diff --git a/open_test.go b/open_test.go index 5e020d48fa..fca70ecae7 100644 --- a/open_test.go +++ b/open_test.go @@ -333,7 +333,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000009.022", + "marker.format-version.000010.023", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/record/record.go b/record/record.go index 20b1624d31..2e03155bea 100644 --- a/record/record.go +++ b/record/record.go @@ -692,10 +692,22 @@ type Writer struct { err error // buf is the buffer. buf [blockSize]byte + + // logNum is the log number of the log. + logNum base.DiskFileNum + + // fillHeader fills in the header for the pending chunk. + // Its implementation is decided at runtime and is decided + // by whether or not the log is writing sync offsets or not. + fillHeader func(bool) + + // headerSize is the size of the type of header that the writer + // is writing. It can either be legacyHeaderSize or walSyncHeaderSize. + headerSize int } // NewWriter returns a new Writer. -func NewWriter(w io.Writer) *Writer { +func NewWriter(w io.Writer, logNum base.DiskFileNum, writingSyncOffsets bool) *Writer { f, _ := w.(flusher) var o int64 @@ -705,16 +717,24 @@ func NewWriter(w io.Writer) *Writer { o = 0 } } - return &Writer{ + wr := &Writer{ w: w, f: f, baseOffset: o, lastRecordOffset: -1, + logNum: logNum, + } + if writingSyncOffsets { + wr.fillHeader = wr.fillHeaderSyncOffsets + wr.headerSize = walSyncHeaderSize + } else { + wr.fillHeader = wr.fillHeaderLegacy + wr.headerSize = legacyHeaderSize } + return wr } -// fillHeader fills in the header for the pending chunk. -func (w *Writer) fillHeader(last bool) { +func (w *Writer) fillHeaderLegacy(last bool) { if w.i+legacyHeaderSize > w.j || w.j > blockSize { panic("pebble/record: bad writer state") } @@ -735,12 +755,35 @@ func (w *Writer) fillHeader(last bool) { binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-legacyHeaderSize)) } +func (w *Writer) fillHeaderSyncOffsets(last bool) { + if w.i+walSyncHeaderSize > w.j || w.j > blockSize { + panic("pebble/record: bad writer state") + } + if last { + if w.first { + w.buf[w.i+6] = walSyncFullChunkEncoding + } else { + w.buf[w.i+6] = walSyncLastChunkEncoding + } + } else { + if w.first { + w.buf[w.i+6] = walSyncFirstChunkEncoding + } else { + w.buf[w.i+6] = walSyncMiddleChunkEncoding + } + } + binary.LittleEndian.PutUint32(w.buf[w.i+7:w.i+11], uint32(w.logNum)) + binary.LittleEndian.PutUint64(w.buf[w.i+11:w.i+19], uint64(w.lastRecordOffset)+uint64(w.written)) + binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], crc.New(w.buf[w.i+6:w.j]).Value()) + binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-walSyncHeaderSize)) +} + // writeBlock writes the buffered block to the underlying writer, and reserves // space for the next chunk's header. func (w *Writer) writeBlock() { _, w.err = w.w.Write(w.buf[w.written:]) w.i = 0 - w.j = legacyHeaderSize + w.j = w.headerSize w.written = 0 w.blockNumber++ } @@ -796,7 +839,7 @@ func (w *Writer) Next() (io.Writer, error) { w.fillHeader(true) } w.i = w.j - w.j = w.j + legacyHeaderSize + w.j = w.j + w.headerSize // Check if there is room in the block for the header. if w.j > blockSize { // Fill in the rest of the block with zeroes. diff --git a/record/record_test.go b/record/record_test.go index c870a285d1..f106e0b7d2 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -89,7 +89,7 @@ func testGeneratorWriter( func testGenerator(t *testing.T, reset func(), gen func() (string, bool)) { t.Run("Writer", func(t *testing.T) { testGeneratorWriter(t, reset, gen, func(w io.Writer) recordWriter { - return NewWriter(w) + return NewWriter(w, 0, false) }) }) @@ -175,7 +175,7 @@ func TestBoundary(t *testing.T) { func TestFlush(t *testing.T) { buf := new(bytes.Buffer) - w := NewWriter(buf) + w := NewWriter(buf, 0, false) // Write a couple of records. Everything should still be held // in the record.Writer buffer, so that buf.Len should be 0. w0, _ := w.Next() @@ -240,7 +240,7 @@ func TestNonExhaustiveRead(t *testing.T) { p := make([]byte, 10) rnd := rand.New(rand.NewPCG(0, 1)) - w := NewWriter(buf) + w := NewWriter(buf, 0, false) for i := 0; i < n; i++ { length := len(p) + rnd.IntN(3*blockSize) s := string(uint8(i)) + "123456789abcdefgh" @@ -267,7 +267,7 @@ func TestNonExhaustiveRead(t *testing.T) { func TestStaleReader(t *testing.T) { buf := new(bytes.Buffer) - w := NewWriter(buf) + w := NewWriter(buf, 0, false) _, err := w.WriteRecord([]byte("0")) require.NoError(t, err) @@ -313,7 +313,7 @@ func makeTestRecords(recordLengths ...int) (*testRecords, error) { } buf := new(bytes.Buffer) - w := NewWriter(buf) + w := NewWriter(buf, base.DiskFileNum(0), false) for i, rec := range ret.records { wRec, err := w.Next() if err != nil { @@ -616,7 +616,7 @@ func TestLastRecordOffset(t *testing.T) { func TestNoLastRecordOffset(t *testing.T) { buf := new(bytes.Buffer) - w := NewWriter(buf) + w := NewWriter(buf, 0, false) defer w.Close() if _, err := w.LastRecordOffset(); err != ErrNoLastRecord { @@ -682,7 +682,7 @@ func TestInvalidLogNum(t *testing.T) { func TestSize(t *testing.T) { var buf bytes.Buffer zeroes := make([]byte, 8<<10) - w := NewWriter(&buf) + w := NewWriter(&buf, 0, false) for i := 0; i < 100; i++ { n := rand.IntN(len(zeroes)) _, err := w.WriteRecord(zeroes[:n]) @@ -1099,6 +1099,41 @@ func describeWALSyncBlocks( f.ToTreePrinter(n) } +func TestManifestSyncOffset(t *testing.T) { + buf := new(bytes.Buffer) + w := NewWriter(buf, 0, true) + w.WriteRecord(bytes.Repeat([]byte{1}, blockSize-walSyncHeaderSize)) + w.WriteRecord(bytes.Repeat([]byte{2}, blockSize-walSyncHeaderSize)) + + raw := buf.Bytes() + r := NewReader(bytes.NewReader(raw), 0) + r.loggerForTesting = &readerLogger{} + for { + _, err := r.Next() + if err != nil { + require.True(t, errors.Is(err, io.EOF)) + require.True(t, r.loggerForTesting.(*readerLogger).getLog() == "") + break + } + } + + // Check that corrupting a chunk should result in us reading ahead and returning + // an ErrInvalidChunk. + raw[0] ^= 0xFF + r = NewReader(bytes.NewReader(raw), 0) + r.loggerForTesting = &readerLogger{} + for { + _, err := r.Next() + if err != nil { + require.True(t, errors.Is(err, ErrInvalidChunk)) + logStr := r.loggerForTesting.(*readerLogger).getLog() + require.True(t, logStr != "") + println(logStr) + break + } + } +} + func BenchmarkRecordWrite(b *testing.B) { for _, size := range []int{8, 16, 32, 64, 256, 1028, 4096, 65_536} { b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { diff --git a/testdata/checkpoint b/testdata/checkpoint index 4697ca4d0c..836a9d3483 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -55,6 +55,10 @@ create: db/marker.format-version.000009.022 close: db/marker.format-version.000009.022 remove: db/marker.format-version.000008.021 sync: db +create: db/marker.format-version.000010.023 +close: db/marker.format-version.000010.023 +remove: db/marker.format-version.000009.022 +sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -121,9 +125,9 @@ sync-data: checkpoints/checkpoint1/OPTIONS-000003 close: checkpoints/checkpoint1/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint1/marker.format-version.000001.022 -close: checkpoints/checkpoint1/marker.format-version.000001.022 +create: checkpoints/checkpoint1/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.023 +close: checkpoints/checkpoint1/marker.format-version.000001.023 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 link: db/000005.sst -> checkpoints/checkpoint1/000005.sst @@ -165,9 +169,9 @@ sync-data: checkpoints/checkpoint2/OPTIONS-000003 close: checkpoints/checkpoint2/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint2 -create: checkpoints/checkpoint2/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint2/marker.format-version.000001.022 -close: checkpoints/checkpoint2/marker.format-version.000001.022 +create: checkpoints/checkpoint2/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.023 +close: checkpoints/checkpoint2/marker.format-version.000001.023 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 link: db/000007.sst -> checkpoints/checkpoint2/000007.sst @@ -204,9 +208,9 @@ sync-data: checkpoints/checkpoint3/OPTIONS-000003 close: checkpoints/checkpoint3/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint3 -create: checkpoints/checkpoint3/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint3/marker.format-version.000001.022 -close: checkpoints/checkpoint3/marker.format-version.000001.022 +create: checkpoints/checkpoint3/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.023 +close: checkpoints/checkpoint3/marker.format-version.000001.023 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 link: db/000005.sst -> checkpoints/checkpoint3/000005.sst @@ -290,7 +294,7 @@ list db LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -300,7 +304,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.022 +marker.format-version.000001.023 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly @@ -367,7 +371,7 @@ list checkpoints/checkpoint2 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.022 +marker.format-version.000001.023 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint2 readonly @@ -409,7 +413,7 @@ list checkpoints/checkpoint3 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.022 +marker.format-version.000001.023 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint3 readonly @@ -528,9 +532,9 @@ sync-data: checkpoints/checkpoint4/OPTIONS-000003 close: checkpoints/checkpoint4/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint4 -create: checkpoints/checkpoint4/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint4/marker.format-version.000001.022 -close: checkpoints/checkpoint4/marker.format-version.000001.022 +create: checkpoints/checkpoint4/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint4/marker.format-version.000001.023 +close: checkpoints/checkpoint4/marker.format-version.000001.023 sync: checkpoints/checkpoint4 close: checkpoints/checkpoint4 link: db/000010.sst -> checkpoints/checkpoint4/000010.sst @@ -618,7 +622,7 @@ list db LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000001.MANIFEST-000001 @@ -637,9 +641,9 @@ sync-data: checkpoints/checkpoint5/OPTIONS-000003 close: checkpoints/checkpoint5/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint5 -create: checkpoints/checkpoint5/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint5/marker.format-version.000001.022 -close: checkpoints/checkpoint5/marker.format-version.000001.022 +create: checkpoints/checkpoint5/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint5/marker.format-version.000001.023 +close: checkpoints/checkpoint5/marker.format-version.000001.023 sync: checkpoints/checkpoint5 close: checkpoints/checkpoint5 link: db/000010.sst -> checkpoints/checkpoint5/000010.sst @@ -739,9 +743,9 @@ sync-data: checkpoints/checkpoint6/OPTIONS-000003 close: checkpoints/checkpoint6/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint6 -create: checkpoints/checkpoint6/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint6/marker.format-version.000001.022 -close: checkpoints/checkpoint6/marker.format-version.000001.022 +create: checkpoints/checkpoint6/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint6/marker.format-version.000001.023 +close: checkpoints/checkpoint6/marker.format-version.000001.023 sync: checkpoints/checkpoint6 close: checkpoints/checkpoint6 link: db/000011.sst -> checkpoints/checkpoint6/000011.sst @@ -910,6 +914,10 @@ create: valsepdb/marker.format-version.000009.022 close: valsepdb/marker.format-version.000009.022 remove: valsepdb/marker.format-version.000008.021 sync: valsepdb +create: valsepdb/marker.format-version.000010.023 +close: valsepdb/marker.format-version.000010.023 +remove: valsepdb/marker.format-version.000009.022 +sync: valsepdb create: valsepdb/temporary.000003.dbtmp sync: valsepdb/temporary.000003.dbtmp close: valsepdb/temporary.000003.dbtmp @@ -958,9 +966,9 @@ sync-data: checkpoints/checkpoint8/OPTIONS-000003 close: checkpoints/checkpoint8/OPTIONS-000003 close: valsepdb/OPTIONS-000003 open-dir: checkpoints/checkpoint8 -create: checkpoints/checkpoint8/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint8/marker.format-version.000001.022 -close: checkpoints/checkpoint8/marker.format-version.000001.022 +create: checkpoints/checkpoint8/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint8/marker.format-version.000001.023 +close: checkpoints/checkpoint8/marker.format-version.000001.023 sync: checkpoints/checkpoint8 close: checkpoints/checkpoint8 link: valsepdb/000006.blob -> checkpoints/checkpoint8/000006.blob @@ -1072,9 +1080,9 @@ sync-data: checkpoints/checkpoint9/OPTIONS-000003 close: checkpoints/checkpoint9/OPTIONS-000003 close: valsepdb/OPTIONS-000003 open-dir: checkpoints/checkpoint9 -create: checkpoints/checkpoint9/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint9/marker.format-version.000001.022 -close: checkpoints/checkpoint9/marker.format-version.000001.022 +create: checkpoints/checkpoint9/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint9/marker.format-version.000001.023 +close: checkpoints/checkpoint9/marker.format-version.000001.023 sync: checkpoints/checkpoint9 close: checkpoints/checkpoint9 link: valsepdb/000006.blob -> checkpoints/checkpoint9/000006.blob diff --git a/testdata/checkpoint_shared b/testdata/checkpoint_shared index 2d2d900460..53db8f7926 100644 --- a/testdata/checkpoint_shared +++ b/testdata/checkpoint_shared @@ -43,6 +43,10 @@ create: db/marker.format-version.000006.022 close: db/marker.format-version.000006.022 remove: db/marker.format-version.000005.021 sync: db +create: db/marker.format-version.000007.023 +close: db/marker.format-version.000007.023 +remove: db/marker.format-version.000006.022 +sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -109,9 +113,9 @@ sync-data: checkpoints/checkpoint1/OPTIONS-000003 close: checkpoints/checkpoint1/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint1/marker.format-version.000001.022 -close: checkpoints/checkpoint1/marker.format-version.000001.022 +create: checkpoints/checkpoint1/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.023 +close: checkpoints/checkpoint1/marker.format-version.000001.023 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -162,9 +166,9 @@ sync-data: checkpoints/checkpoint2/OPTIONS-000003 close: checkpoints/checkpoint2/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint2 -create: checkpoints/checkpoint2/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint2/marker.format-version.000001.022 -close: checkpoints/checkpoint2/marker.format-version.000001.022 +create: checkpoints/checkpoint2/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.023 +close: checkpoints/checkpoint2/marker.format-version.000001.023 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -211,9 +215,9 @@ sync-data: checkpoints/checkpoint3/OPTIONS-000003 close: checkpoints/checkpoint3/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoints/checkpoint3 -create: checkpoints/checkpoint3/marker.format-version.000001.022 -sync-data: checkpoints/checkpoint3/marker.format-version.000001.022 -close: checkpoints/checkpoint3/marker.format-version.000001.022 +create: checkpoints/checkpoint3/marker.format-version.000001.023 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.023 +close: checkpoints/checkpoint3/marker.format-version.000001.023 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -270,7 +274,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000006.022 +marker.format-version.000007.023 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 @@ -280,7 +284,7 @@ list checkpoints/checkpoint1 MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.022 +marker.format-version.000001.023 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 @@ -332,7 +336,7 @@ list checkpoints/checkpoint2 MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.022 +marker.format-version.000001.023 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 diff --git a/testdata/event_listener b/testdata/event_listener index bcffd72dcb..47ad0e46a8 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -71,6 +71,11 @@ close: db/marker.format-version.000009.022 remove: db/marker.format-version.000008.021 sync: db upgraded to format version: 022 +create: db/marker.format-version.000010.023 +close: db/marker.format-version.000010.023 +remove: db/marker.format-version.000009.022 +sync: db +upgraded to format version: 023 create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -380,9 +385,9 @@ sync-data: checkpoint/OPTIONS-000003 close: checkpoint/OPTIONS-000003 close: db/OPTIONS-000003 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.022 -sync-data: checkpoint/marker.format-version.000001.022 -close: checkpoint/marker.format-version.000001.022 +create: checkpoint/marker.format-version.000001.023 +sync-data: checkpoint/marker.format-version.000001.023 +close: checkpoint/marker.format-version.000001.023 sync: checkpoint close: checkpoint link: db/000013.sst -> checkpoint/000013.sst diff --git a/testdata/flushable_ingest b/testdata/flushable_ingest index 386df7ca48..23a41968b7 100644 --- a/testdata/flushable_ingest +++ b/testdata/flushable_ingest @@ -60,7 +60,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000001.MANIFEST-000001 # Test basic WAL replay @@ -81,7 +81,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000001.MANIFEST-000001 open @@ -390,7 +390,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000001.MANIFEST-000001 close @@ -410,7 +410,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000001.MANIFEST-000001 open @@ -443,7 +443,7 @@ MANIFEST-000001 MANIFEST-000011 OPTIONS-000014 ext -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000002.MANIFEST-000011 # Make sure that the new mutable memtable can accept writes. @@ -586,7 +586,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000001.MANIFEST-000001 close @@ -605,7 +605,7 @@ MANIFEST-000001 OPTIONS-000003 ext ext1 -marker.format-version.000009.022 +marker.format-version.000010.023 marker.manifest.000001.MANIFEST-000001 open diff --git a/version_set.go b/version_set.go index 1ca555ca14..2581d0e602 100644 --- a/version_set.go +++ b/version_set.go @@ -242,23 +242,28 @@ func (vs *versionSet) load( errors.Safe(manifestFilename), dirname) } defer manifestFile.Close() - rr := record.NewReader(manifestFile, 0 /* logNum */) + rr := record.NewReader(manifestFile, manifestFileNum) for { r, err := rr.Next() - if err == io.EOF || record.IsInvalidRecord(err) { + if errors.Is(err, io.EOF) || errors.Is(err, record.ErrUnexpectedEOF) { break - } - if err != nil { + } else if errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) { + // If a read-ahead returns one of these errors, they should be marked with corruption. + // Other I/O related errors should not be marked with corruption and simply returned. + err = errors.Mark(err, ErrCorruption) return errors.Wrapf(err, "pebble: error when loading manifest file %q", errors.Safe(manifestFilename)) } var ve versionEdit err = ve.Decode(r) if err != nil { - // Break instead of returning an error if the record is corrupted - // or invalid. - if err == io.EOF || record.IsInvalidRecord(err) { + if errors.Is(err, io.EOF) || errors.Is(err, record.ErrUnexpectedEOF) { break + } else if errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) { + // If a read-ahead returns one of these errors, they should be marked with corruption. + // Other I/O related errors should not be marked with corruption and simply returned. + err = errors.Mark(err, ErrCorruption) + return errors.Wrap(err, "pebble: error when loading manifest") } return err } @@ -1013,7 +1018,7 @@ func (vs *versionSet) createManifest( if err != nil { return err } - manifestWriter = record.NewWriter(manifestFile) + manifestWriter = record.NewWriter(manifestFile, fileNum, vs.getFormatMajorVersion() >= FormatManifestSyncChunks) snapshot := manifest.VersionEdit{ ComparerName: vs.cmp.Name,