@@ -107,13 +107,18 @@ package record
107107
108108import (
109109 "encoding/binary"
110+ "encoding/hex"
111+ "fmt"
110112 "io"
111113 "math"
114+ "strings"
112115
113116 "github.com/cockroachdb/errors"
114117 "github.com/cockroachdb/pebble/internal/base"
118+ "github.com/cockroachdb/pebble/internal/binfmt"
115119 "github.com/cockroachdb/pebble/internal/bitflip"
116120 "github.com/cockroachdb/pebble/internal/crc"
121+ "github.com/cockroachdb/pebble/internal/treeprinter"
117122)
118123
119124// These constants are part of the wire format and should not be changed.
@@ -252,12 +257,33 @@ type Reader struct {
252257 // encountered during WAL replay was the logical EOF or confirmed corruption.
253258 invalidOffset uint64
254259
255- // loggerForTesting is a logging helper used by the Reader to accumulate log messages.
256- loggerForTesting loggerForTesting
260+ // logger is a logging helper used by the Reader to accumulate log messages.
261+ logger * loggerForTesting
262+
263+ // visualLogger is a logging helper used by the Reader to accumulate visual logs.
264+ visualLogger * visualLoggerForTesting
265+ }
266+
267+ type loggerForTesting struct {
268+ verbose bool
269+ builder strings.Builder
270+ }
271+
272+ func (l * loggerForTesting ) logf (format string , args ... interface {}) {
273+ fmt .Fprintf (& l .builder , format , args ... )
257274}
258275
259- type loggerForTesting interface {
260- logf (format string , args ... interface {})
276+ func (l * loggerForTesting ) getLog () string {
277+ return l .builder .String ()
278+ }
279+
280+ type visualLoggerForTesting struct {
281+ verbose bool
282+ f * binfmt.Formatter
283+ tp * treeprinter.Node
284+ blockRoot treeprinter.Node
285+ blockNode treeprinter.Node
286+ chunkNode treeprinter.Node
261287}
262288
263289// NewReader returns a new reader. If the file contains records encoded using
@@ -425,6 +451,20 @@ func (r *Reader) Next() (io.Reader, error) {
425451 return singleReader {r , r .seq }, nil
426452}
427453
454+ func (r * Reader ) InvestigateChunks (verbose bool ) (string , string ) {
455+ tree := treeprinter .New ()
456+ r .visualLogger = & visualLoggerForTesting {
457+ f : binfmt .New (r .buf [:]).LineWidth (20 ),
458+ tp : & tree ,
459+ verbose : verbose ,
460+ }
461+ r .logger = & loggerForTesting {
462+ verbose : verbose ,
463+ }
464+ r .readAheadForCorruption ()
465+ return r .visualLogger .tp .String (), r .logger .getLog ()
466+ }
467+
428468// readAheadForCorruption scans ahead in the log to detect corruption.
429469// It loads in blocks and reads chunks until it either detects corruption
430470// due to an offset (encoded in a chunk header) exceeding the invalid offset,
@@ -440,19 +480,44 @@ func (r *Reader) Next() (io.Reader, error) {
440480// if there is confirmation of a corruption, otherwise ErrUnexpectedEOF is
441481// returned after reading all the blocks without corruption confirmation.
442482func (r * Reader ) readAheadForCorruption () error {
443- if r .loggerForTesting != nil {
444- r .loggerForTesting .logf ("Starting read ahead for corruption. Block corrupted %d.\n " , r .blockNum )
483+ if r .logger != nil {
484+ r .logger .logf ("Starting read ahead for corruption. Block corrupted %d.\n " , r .blockNum )
485+ }
486+ if r .visualLogger != nil {
487+ r .visualLogger .blockRoot = r .visualLogger .tp .Child ("Block" )
488+ }
489+ getBufferDump := func (buf []byte , i int , j int ) string {
490+ return fmt .Sprintf ("Buffer Dump: %s\n " , hex .EncodeToString (buf [i :j ]))
491+ }
492+
493+ logMsgAndDump := func (logMsg , bufferDump string ) {
494+ if r .logger != nil {
495+ r .logger .logf ("\t %s" , logMsg )
496+ if r .logger .verbose {
497+ r .logger .logf ("\t %s" , bufferDump )
498+ }
499+ }
500+ if r .visualLogger != nil {
501+ r .visualLogger .chunkNode .Child (logMsg )
502+ if r .visualLogger .verbose {
503+ r .visualLogger .chunkNode .Child (bufferDump )
504+ }
505+ }
506+ }
507+
508+ if r .visualLogger != nil {
509+ defer r .visualLogger .f .SetAnchorOffset ()
510+ defer r .visualLogger .f .ToTreePrinter (r .visualLogger .blockRoot )
445511 }
446512
447513 for {
448514 // Load the next block into r.buf.
449515 n , err := io .ReadFull (r .r , r .buf [:])
450516 r .begin , r .end , r .n = 0 , 0 , n
451517 r .blockNum ++
452- if r .loggerForTesting != nil {
453- r .loggerForTesting .logf ("Read block %d with %d bytes\n " , r .blockNum , n )
518+ if r .logger != nil {
519+ r .logger .logf ("Read block %d with %d bytes\n " , r .blockNum , n )
454520 }
455-
456521 if errors .Is (err , io .EOF ) {
457522 // io.ErrUnexpectedEOF is returned instead of
458523 // io.EOF because io library functions clear
@@ -464,8 +529,8 @@ func (r *Reader) readAheadForCorruption() error {
464529 // invalid chunk should have been valid, the chunk represents
465530 // an abrupt, unclean termination of the logical log. This
466531 // abrupt end of file represented by io.ErrUnexpectedEOF.
467- if r .loggerForTesting != nil {
468- r .loggerForTesting .logf ("\t Encountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n " )
532+ if r .logger != nil {
533+ r .logger .logf ("\t Encountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n " )
469534 }
470535 return io .ErrUnexpectedEOF
471536 }
@@ -475,93 +540,141 @@ func (r *Reader) readAheadForCorruption() error {
475540 // However, if the error is not ErrUnexpectedEOF, then this
476541 // error should be surfaced.
477542 if err != nil && err != io .ErrUnexpectedEOF {
478- if r .loggerForTesting != nil {
479- r .loggerForTesting .logf ("\t Error reading block %d: %v" , r .blockNum , err )
543+ if r .logger != nil {
544+ r .logger .logf ("\t Error reading block %d: %v" , r .blockNum , err )
480545 }
481546 return err
482547 }
483548
549+ chunkCount := 0
484550 for r .end + legacyHeaderSize <= r .n {
485551 checksum := binary .LittleEndian .Uint32 (r .buf [r .end + 0 : r .end + 4 ])
486552 length := binary .LittleEndian .Uint16 (r .buf [r .end + 4 : r .end + 6 ])
487553 chunkEncoding := r .buf [r .end + 6 ]
554+ bufferDump := getBufferDump (r .buf [:], r .end , r .n )
555+ chunkCount ++
488556
489- if r .loggerForTesting != nil {
490- r .loggerForTesting .logf ("\t Block %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n " , r .blockNum , r .end , checksum , length , chunkEncoding )
557+ if r .logger != nil {
558+ r .logger .logf ("\t Block %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n " , r .blockNum , r .end , checksum , length , chunkEncoding )
559+ }
560+ if r .visualLogger != nil {
561+ if chunkCount == 1 {
562+ r .visualLogger .blockNode = r .visualLogger .blockRoot .Childf ("Block #%d" , r .blockNum )
563+ }
564+ r .visualLogger .chunkNode = r .visualLogger .blockNode .Childf ("Chunk #%d at offset %d" , chunkCount , r .end )
565+ r .visualLogger .chunkNode .Childf ("Checksum: %d" , checksum )
566+ r .visualLogger .chunkNode .Childf ("Encoded Length: %d" , length )
491567 }
492568
493569 if int (chunkEncoding ) >= len (headerFormatMappings ) {
494- if r .loggerForTesting != nil {
495- r .loggerForTesting .logf ("\t Invalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n " , chunkEncoding , r .blockNum )
496- }
570+ logMsg := fmt .Sprintf ("Invalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n " , chunkEncoding , r .blockNum )
571+ logMsgAndDump (logMsg , bufferDump )
497572 break
498573 }
499574
500575 headerFormat := headerFormatMappings [chunkEncoding ]
501576 chunkPosition , wireFormat , headerSize := headerFormat .chunkPosition , headerFormat .wireFormat , headerFormat .headerSize
577+ if r .visualLogger != nil {
578+ encodingStr := chunkEncodingStr (chunkEncoding )
579+ r .visualLogger .chunkNode .Childf ("Chunk encoding: %s(%d) (chunkPosition: %d, wireFormat: %d)" , encodingStr , chunkEncoding , chunkPosition , wireFormat )
580+ }
581+
502582 if checksum == 0 && length == 0 && chunkPosition == invalidChunkPosition {
503- if r .loggerForTesting != nil {
504- r .loggerForTesting .logf ("\t Found invalid chunk marker at block %d offset %d; aborting this block scan\n " , r .blockNum , r .end )
505- }
583+ logMsg := fmt .Sprintf ("Found invalid chunk marker at block %d offset %d; aborting this block scan\n " , r .blockNum , r .end )
584+ logMsgAndDump (logMsg , bufferDump )
506585 break
507586 }
508587 if wireFormat == invalidWireFormat {
509- if r .loggerForTesting != nil {
510- r .loggerForTesting .logf ("\t Invalid wire format detected in block %d at offset %d\n " , r .blockNum , r .end )
511- }
588+ logMsg := fmt .Sprintf ("Invalid wire format detected in block %d at offset %d\n " , r .blockNum , r .end )
589+ logMsgAndDump (logMsg , bufferDump )
512590 break
513591 }
514592 if wireFormat == recyclableWireFormat || wireFormat == walSyncWireFormat {
515593 if r .end + headerSize > r .n {
516- if r .loggerForTesting != nil {
517- r .loggerForTesting .logf ("\t Incomplete header in block %d at offset %d; breaking out\n " , r .blockNum , r .end )
518- }
594+ logMsg := fmt .Sprintf ("Incomplete header in block %d at offset %d; breaking out\n " , r .blockNum , r .end )
595+ logMsgAndDump (logMsg , bufferDump )
519596 break
520597 }
521598 logNum := binary .LittleEndian .Uint32 (r .buf [r .end + 7 : r .end + 11 ])
599+ if r .visualLogger != nil {
600+ r .visualLogger .chunkNode .Childf ("Log Num: %d" , logNum )
601+ }
522602 if logNum != r .logNum {
523- if r .loggerForTesting != nil {
524- r .loggerForTesting .logf ("\t Mismatch log number in block %d at offset %d (expected %d, got %d)\n " , r .blockNum , r .end , r .logNum , logNum )
525- }
603+ logMsg := fmt .Sprintf ("Mismatch log number in block %d at offset %d (expected %d, got %d)\n " , r .blockNum , r .end , r .logNum , logNum )
604+ logMsgAndDump (logMsg , bufferDump )
526605 break
527606 }
528607 }
529608
530609 r .begin = r .end + headerSize
531610 r .end = r .begin + int (length )
611+ bufferDump = getBufferDump (r .buf [:], r .begin , min (r .end , r .n ))
532612 if r .end > r .n {
533613 // The chunk straddles a 32KB boundary (or the end of file).
534- if r .loggerForTesting != nil {
535- r .loggerForTesting .logf ("\t Chunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n " , r .blockNum , r .begin , r .end , r .n )
536- }
614+ logMsg := fmt .Sprintf ("Chunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n " , r .blockNum , r .begin , r .end , r .n )
615+ logMsgAndDump (logMsg , bufferDump )
537616 break
538617 }
539618 if checksum != crc .New (r .buf [r .begin - headerSize + 6 :r .end ]).Value () {
540- if r .loggerForTesting != nil {
541- r .loggerForTesting .logf ("\t Checksum mismatch in block %d at offset %d; potential corruption\n " , r .blockNum , r .end )
542- }
619+ logMsg := fmt .Sprintf ("Checksum mismatch in block %d at offset %d; potential corruption\n " , r .blockNum , r .end )
620+ logMsgAndDump (logMsg , bufferDump )
543621 break
544622 }
545623
546624 // Decode offset in header when chunk has the WAL Sync wire format.
547625 if wireFormat == walSyncWireFormat {
548626 syncedOffset := binary .LittleEndian .Uint64 (r .buf [r .begin - headerSize + 11 : r .begin - headerSize + 19 ])
549- if r .loggerForTesting != nil {
550- r .loggerForTesting .logf ("\t Block %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n " , r .blockNum , syncedOffset , r .invalidOffset )
627+ if r .visualLogger != nil {
628+ r .visualLogger .chunkNode .Childf ("Synced Offset: %d" , syncedOffset )
629+ }
630+ if r .logger != nil {
631+ r .logger .logf ("Block %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n " , r .blockNum , syncedOffset , r .invalidOffset )
551632 }
552633 // If the encountered chunk offset promises durability beyond the invalid offset,
553634 // the invalid offset must have been corruption.
554635 if syncedOffset > r .invalidOffset {
555- if r .loggerForTesting != nil {
556- r .loggerForTesting .logf ("\t Corruption confirmed: syncedOffset %d exceeds invalidOffset %d\n " , syncedOffset , r .invalidOffset )
557- }
636+ logMsg := fmt .Sprintf ("Corruption confirmed: syncedOffset %d exceeds invalidOffset %d\n " , syncedOffset , r .invalidOffset )
637+ logMsgAndDump (logMsg , bufferDump )
558638 return r .err
559639 }
560640 }
561641 }
562642 }
563643}
564644
645+ func chunkEncodingStr (encoding byte ) string {
646+ switch encoding {
647+ case invalidChunkEncoding :
648+ return "invalidInvalidChunk"
649+ case fullChunkEncoding :
650+ return "legacyFullChunk"
651+ case firstChunkEncoding :
652+ return "legacyFirstChunk"
653+ case middleChunkEncoding :
654+ return "legacyMiddleChunk"
655+ case lastChunkEncoding :
656+ return "legacyLastChunk"
657+ case recyclableFullChunkEncoding :
658+ return "recyclableFullChunk"
659+ case recyclableFirstChunkEncoding :
660+ return "recyclableFirstChunk"
661+ case recyclableMiddleChunkEncoding :
662+ return "recyclableMiddleChunk"
663+ case recyclableLastChunkEncoding :
664+ return "recyclableLastChunk"
665+ case walSyncFullChunkEncoding :
666+ return "walSyncFullChunk"
667+ case walSyncFirstChunkEncoding :
668+ return "walSyncFirstChunk"
669+ case walSyncMiddleChunkEncoding :
670+ return "walSyncMiddleChunk"
671+ case walSyncLastChunkEncoding :
672+ return "walSyncLastChunk"
673+ default :
674+ return "unknown encoding"
675+ }
676+ }
677+
565678// Offset returns the current offset within the file. If called immediately
566679// before a call to Next(), Offset() will return the record offset.
567680func (r * Reader ) Offset () int64 {
0 commit comments