Skip to content

Commit bce7483

Browse files
committed
pkg/util/log: introduce crdb-v1-zip-upload format decoder
Introduces a new log format decoder (`crdb-v1-zip-upload`) specifically for debug zip uploads that removes the 64KB token size limitation of `bufio.Scanner`. The new decoder uses `bufio.Reader` to handle arbitrarily large log entries without truncation. Truncation of logs can lead to partial log entries, which might be acceptable in some cases. However, logs in JSON format may break their validity if they are only partially available. We had also encountered an issue with JSON marshalling due to this. This change addresses that issue. `crdb-v1-zip-upload` is only used internally by debug zip upload operations and not used in general log processing. Jira: CRDB-51108, CRDB-51109 Release note: None
1 parent dc4e40c commit bce7483

File tree

6 files changed

+199
-92
lines changed

6 files changed

+199
-92
lines changed

pkg/cli/debug.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1566,7 +1566,7 @@ func init() {
15661566
"Name of the cluster to associate with the debug zip artifacts. This can be used to identify data in the upstream observability tool.")
15671567
f.Var(&debugZipUploadOpts.from, "from", "oldest timestamp to include (inclusive)")
15681568
f.Var(&debugZipUploadOpts.to, "to", "newest timestamp to include (inclusive)")
1569-
f.StringVar(&debugZipUploadOpts.logFormat, "log-format", "crdb-v1",
1569+
f.StringVar(&debugZipUploadOpts.logFormat, "log-format", "crdb-v1-zip-upload",
15701570
"log format of the input files")
15711571
// the log-format flag is depricated. It will
15721572
// eventually be removed completely. keeping it hidden for now incase we ever

pkg/cli/zip_upload.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ func processLogFile(
483483
debugZipUploadOpts.tags..., // user provided tags
484484
), getUploadType(currentTimestamp))
485485
if err != nil {
486-
fmt.Println(err)
486+
fmt.Println("logEntryToJSON:", err)
487487
continue
488488
}
489489

pkg/util/log/format_crdb_v1.go

Lines changed: 126 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,29 @@ type entryDecoderV1 struct {
442442
truncatedLastEntry bool
443443
}
444444

445+
// entryDecoderV1ZipUpload is a specialized variant of entryDecoderV1 that uses
446+
// bufio.Reader instead of bufio.Scanner. This implementation is designed for use
447+
// in CLI tools (like debug zip upload) where:
448+
// 1. The 64KB token size limitation of bufio.Scanner needs to be removed
449+
// 2. Memory constraints are less critical for short-lived CLI processes
450+
// 3. Complete data capture is essential for debug tools - no entries should be truncated
451+
type entryDecoderV1ZipUpload struct {
452+
reader *bufio.Reader
453+
sensitiveEditor redactEditor
454+
}
455+
456+
// Decode decodes the next log entry into the provided protobuf message.
457+
func (d *entryDecoderV1ZipUpload) Decode(entry *logpb.Entry) error {
458+
buf, err := d.reader.ReadBytes('\n')
459+
if err == io.EOF && len(buf) == 0 {
460+
return io.EOF
461+
} else if err != nil && !errors.Is(err, bufio.ErrBufferFull) && errors.Is(err, io.EOF) {
462+
return err
463+
}
464+
465+
return parseEntryV1(buf, entry, d.sensitiveEditor)
466+
}
467+
445468
func decodeTimestamp(fragment []byte) (unixNano int64, err error) {
446469
timeFormat := MessageTimeFormat
447470
if len(fragment) > 7 && (fragment[len(fragment)-7] == '+' || fragment[len(fragment)-7] == '-') {
@@ -464,112 +487,127 @@ func (d *entryDecoderV1) Decode(entry *logpb.Entry) error {
464487
}
465488
return io.EOF
466489
}
467-
b := d.scanner.Bytes()
468-
m := entryREV1.FindSubmatch(b)
469-
if m == nil {
470-
continue
471-
}
472-
473-
// Erase all the fields, to be sure.
474-
*entry = logpb.Entry{}
475490

476-
// Process the severity.
477-
entry.Severity = Severity(strings.IndexByte(severityChar, m[1][0]) + 1)
491+
if err := parseEntryV1(d.scanner.Bytes(), entry, d.sensitiveEditor); err != nil {
492+
if errors.Is(err, io.EOF) || errors.Is(err, errNoLogEntry) {
493+
continue
494+
}
478495

479-
// Process the timestamp.
480-
var err error
481-
entry.Time, err = decodeTimestamp(m[2])
482-
if err != nil {
483496
return err
484497
}
498+
return nil
499+
}
500+
}
485501

486-
// Process the goroutine ID.
487-
if len(m[3]) > 0 {
488-
goroutine, err := strconv.Atoi(string(m[3]))
489-
if err != nil {
490-
return err
491-
}
492-
entry.Goroutine = int64(goroutine)
493-
}
502+
var errNoLogEntry = errors.New("no log entry found in buffer")
494503

495-
// Process the channel/file/line details.
496-
entry.File = string(m[4])
497-
if idx := strings.IndexByte(entry.File, '@'); idx != -1 {
498-
ch, err := strconv.Atoi(entry.File[:idx])
499-
if err != nil {
500-
return err
501-
}
502-
entry.Channel = Channel(ch)
503-
entry.File = entry.File[idx+1:]
504-
}
504+
// parseEntryV1 parses a log entry from a byte slice into the provided protobuf message.
505+
// It contains the common parsing logic used by both decoder implementations.
506+
func parseEntryV1(buf []byte, entry *logpb.Entry, sensitiveEditor redactEditor) error {
507+
m := entryREV1.FindSubmatch(buf)
508+
if m == nil {
509+
return errNoLogEntry
510+
}
511+
512+
// Erase all the fields, to be sure.
513+
*entry = logpb.Entry{}
514+
515+
// Process the severity.
516+
entry.Severity = Severity(strings.IndexByte(severityChar, m[1][0]) + 1)
517+
518+
// Process the timestamp.
519+
var err error
520+
entry.Time, err = decodeTimestamp(m[2])
521+
if err != nil {
522+
return err
523+
}
505524

506-
line, err := strconv.Atoi(string(m[5]))
525+
// Process the goroutine ID.
526+
if len(m[3]) > 0 {
527+
goroutine, err := strconv.Atoi(string(m[3]))
507528
if err != nil {
508529
return err
509530
}
510-
entry.Line = int64(line)
511-
512-
// Process the context tags.
513-
redactable := len(m[6]) != 0
514-
// Look for a tenant ID tag. Default to system otherwise.
515-
entry.TenantID = serverident.SystemTenantID
516-
tagsToProcess := m[7]
517-
entry.TenantID, entry.TenantName, tagsToProcess = maybeReadTenantDetails(tagsToProcess)
518-
519-
// Process any remaining tags.
520-
if len(tagsToProcess) != 0 {
521-
r := redactablePackage{
522-
msg: tagsToProcess,
523-
redactable: redactable,
524-
}
525-
r = d.sensitiveEditor(r)
526-
entry.Tags = string(r.msg)
527-
}
531+
entry.Goroutine = int64(goroutine)
532+
}
528533

529-
// If there's an entry counter at the start of the message, process it.
530-
msg := b[len(m[0]):]
531-
i := 0
532-
for ; i < len(msg) && msg[i] >= '0' && msg[i] <= '9'; i++ {
533-
entry.Counter = entry.Counter*10 + uint64(msg[i]-'0')
534-
}
535-
if i > 0 && i < len(msg) && msg[i] == ' ' {
536-
// Only accept the entry counter if followed by a space. In all
537-
// other cases, the number was part of the message string.
538-
msg = msg[i+1:]
539-
} else {
540-
// This was not truly an entry counter. Ignore the work done previously.
541-
entry.Counter = 0
534+
// Process the channel/file/line details.
535+
entry.File = string(m[4])
536+
if idx := strings.IndexByte(entry.File, '@'); idx != -1 {
537+
ch, err := strconv.Atoi(entry.File[:idx])
538+
if err != nil {
539+
return err
542540
}
541+
entry.Channel = Channel(ch)
542+
entry.File = entry.File[idx+1:]
543+
}
544+
545+
line, err := strconv.Atoi(string(m[5]))
546+
if err != nil {
547+
return err
548+
}
549+
entry.Line = int64(line)
550+
551+
// Process the context tags.
552+
redactable := len(m[6]) != 0
553+
// Look for a tenant ID tag. Default to system otherwise.
554+
entry.TenantID = serverident.SystemTenantID
555+
tagsToProcess := m[7]
556+
entry.TenantID, entry.TenantName, tagsToProcess = maybeReadTenantDetails(tagsToProcess)
543557

544-
// Process the remainder of the log message.
558+
// Process any remaining tags.
559+
if len(tagsToProcess) != 0 {
545560
r := redactablePackage{
546-
msg: trimFinalNewLines(msg),
561+
msg: tagsToProcess,
547562
redactable: redactable,
548563
}
549-
r = d.sensitiveEditor(r)
550-
entry.Message = string(r.msg)
551-
entry.Redactable = r.redactable
552-
553-
if strings.HasPrefix(entry.Message, structuredEntryPrefix+"{") /* crdb-v1 prefix */ {
554-
// Note: we do not recognize the v2 marker here (" ={") because
555-
// v2 entries can be split across multiple lines.
556-
entry.StructuredStart = uint32(len(structuredEntryPrefix))
557-
558-
if nl := strings.IndexByte(entry.Message, '\n'); nl != -1 {
559-
entry.StructuredEnd = uint32(nl)
560-
entry.StackTraceStart = uint32(nl + 1)
561-
} else {
562-
entry.StructuredEnd = uint32(len(entry.Message))
563-
}
564-
}
565-
// Note: we only know how to populate entry.StackTraceStart upon
566-
// parse if the entry was structured (see above). If it is not
567-
// structured, we cannot distinguish where the message ends and
568-
// where the stack trace starts. This is another reason why the
569-
// crdb-v1 format is lossy.
564+
r = sensitiveEditor(r)
565+
entry.Tags = string(r.msg)
566+
}
570567

571-
return nil
568+
// If there's an entry counter at the start of the message, process it.
569+
msg := buf[len(m[0]):]
570+
i := 0
571+
for ; i < len(msg) && msg[i] >= '0' && msg[i] <= '9'; i++ {
572+
entry.Counter = entry.Counter*10 + uint64(msg[i]-'0')
573+
}
574+
if i > 0 && i < len(msg) && msg[i] == ' ' {
575+
// Only accept the entry counter if followed by a space. In all
576+
// other cases, the number was part of the message string.
577+
msg = msg[i+1:]
578+
} else {
579+
// This was not truly an entry counter. Ignore the work done previously.
580+
entry.Counter = 0
572581
}
582+
583+
// Process the remainder of the log message.
584+
r := redactablePackage{
585+
msg: trimFinalNewLines(msg),
586+
redactable: redactable,
587+
}
588+
r = sensitiveEditor(r)
589+
entry.Message = string(r.msg)
590+
entry.Redactable = r.redactable
591+
592+
if strings.HasPrefix(entry.Message, structuredEntryPrefix+"{") /* crdb-v1 prefix */ {
593+
// Note: we do not recognize the v2 marker here (" ={") because
594+
// v2 entries can be split across multiple lines.
595+
entry.StructuredStart = uint32(len(structuredEntryPrefix))
596+
597+
if nl := strings.IndexByte(entry.Message, '\n'); nl != -1 {
598+
entry.StructuredEnd = uint32(nl)
599+
entry.StackTraceStart = uint32(nl + 1)
600+
} else {
601+
entry.StructuredEnd = uint32(len(entry.Message))
602+
}
603+
}
604+
// Note: we only know how to populate entry.StackTraceStart upon
605+
// parse if the entry was structured (see above). If it is not
606+
// structured, we cannot distinguish where the message ends and
607+
// where the stack trace starts. This is another reason why the
608+
// crdb-v1 format is lossy.
609+
610+
return nil
573611
}
574612

575613
// maybeReadTenantDetails reads the tenant ID and name. If neither the

pkg/util/log/format_crdb_v1_test.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cockroachdb/datadriven"
2525
"github.com/kr/pretty"
2626
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
2728
)
2829

2930
func TestCrdbV1EncodeDecode(t *testing.T) {
@@ -107,9 +108,9 @@ func TestCrdbV1EncodeDecode(t *testing.T) {
107108
})
108109
}
109110

110-
func TestCrdbV1EntryDecoderForVeryLargeEntries(t *testing.T) {
111+
func entryFormatter() func(s Severity, c Channel, now time.Time, gid int, file string, line int, tags, msg string) string {
111112
entryIdx := 1
112-
formatEntry := func(s Severity, c Channel, now time.Time, gid int, file string, line int, tags, msg string) string {
113+
return func(s Severity, c Channel, now time.Time, gid int, file string, line int, tags, msg string) string {
113114
entry := logpb.Entry{
114115
Severity: s,
115116
Channel: c,
@@ -126,7 +127,10 @@ func TestCrdbV1EntryDecoderForVeryLargeEntries(t *testing.T) {
126127
_ = FormatLegacyEntry(entry, &buf)
127128
return buf.String()
128129
}
130+
}
129131

132+
func TestCrdbV1EntryDecoderForVeryLargeEntries(t *testing.T) {
133+
formatEntry := entryFormatter()
130134
t1 := timeutil.Now().Round(time.Microsecond)
131135
t2 := t1.Add(time.Microsecond)
132136

@@ -194,6 +198,64 @@ func TestCrdbV1EntryDecoderForVeryLargeEntries(t *testing.T) {
194198
}
195199
}
196200

201+
func TestCrdbV1ZipUploadEntryDecoderForVeryLargeEntries(t *testing.T) {
202+
formatEntry := entryFormatter()
203+
t1 := timeutil.Now().Round(time.Microsecond)
204+
t2 := t1.Add(time.Microsecond)
205+
206+
preambleLength := len(formatEntry(severity.INFO, channel.DEV, t1, 0, "somefile.go", 136, ``, ""))
207+
maxMessageLength := bufio.MaxScanTokenSize - preambleLength - 1
208+
reallyLongEntry := string(bytes.Repeat([]byte("a"), maxMessageLength)) // just short of MaxScanTokenSize
209+
tooLongEntry := reallyLongEntry + "a" // one byte too long
210+
211+
contents := formatEntry(severity.INFO, channel.DEV, t1, 2, "somefile.go", 138, ``, reallyLongEntry)
212+
contents += formatEntry(severity.INFO, channel.DEV, t2, 3, "somefile.go", 139, ``, tooLongEntry)
213+
214+
decoder, err := NewEntryDecoderWithFormat(
215+
strings.NewReader(contents), WithFlattenedSensitiveData, "crdb-v1-zip-upload",
216+
)
217+
require.NoError(t, err, "error while constructing decoder")
218+
219+
var entries []logpb.Entry
220+
var entry logpb.Entry
221+
for {
222+
if err := decoder.Decode(&entry); err != nil {
223+
if err == io.EOF {
224+
break
225+
}
226+
require.NoError(t, err, "error while decoding")
227+
}
228+
entries = append(entries, entry)
229+
}
230+
231+
expected := []logpb.Entry{
232+
{
233+
Severity: severity.INFO,
234+
Channel: channel.DEV,
235+
Time: t1.UnixNano(),
236+
Goroutine: 2,
237+
File: `somefile.go`,
238+
Line: 138,
239+
Message: reallyLongEntry,
240+
Counter: 2,
241+
TenantID: serverident.SystemTenantID,
242+
},
243+
{
244+
Severity: severity.INFO,
245+
Channel: channel.DEV,
246+
Time: t2.UnixNano(),
247+
Goroutine: 3,
248+
File: `somefile.go`,
249+
Line: 139,
250+
Message: tooLongEntry, // Should have the entire length, not truncated.
251+
Counter: 3,
252+
TenantID: serverident.SystemTenantID,
253+
},
254+
}
255+
256+
require.Equal(t, expected, entries, "entries do not match")
257+
}
258+
197259
func TestReadTenantDetails(t *testing.T) {
198260
tc := []struct {
199261
in string

pkg/util/log/formats.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type logFormatter interface {
2929

3030
// FormatParsers maps the user facing format names to the internal representation.
3131
var FormatParsers = map[string]string{
32+
"crdb-v1-zip-upload": "v1-zip-upload",
3233
"crdb-v1": "v1",
3334
"crdb-v1-count": "v1",
3435
"crdb-v1-tty": "v1",

pkg/util/log/log_decoder.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ func NewEntryDecoderWithFormat(
9292
}
9393
decoder.scanner.Split(decoder.split)
9494
d = decoder
95+
case "v1-zip-upload":
96+
decoder := &entryDecoderV1ZipUpload{
97+
reader: bufio.NewReader(in),
98+
sensitiveEditor: getEditor(editMode),
99+
}
100+
d = decoder
95101
case "json":
96102
d = &entryDecoderJSON{
97103
decoder: json.NewDecoder(in),

0 commit comments

Comments
 (0)