Skip to content

Commit 64d6a47

Browse files
craig[bot]arjunmahishi
andcommitted
Merge #148183
148183: pkg/util/log: introduce crdb-v1-zip-upload format decoder r=arjunmahishi a=arjunmahishi 1. **pkg/cli (debug-zip): add dry-run mode for debug zip uploads** Introduce a `--dry-run` flag to the debug zip upload command. This allows us to simulate the upload process without performing actual uploads. The dry-run mode is used only for debugging/testing. This flag is hidden from the help text Epic: None Release note: None --- 2. **pkg/util/log: introduce crdb-v1-zip-upload format decoder** Introduces a new log format decoder (`crdb-v1-zip-upload`) specifically for debug zip uploads that removes the 64KB token size limitation of `bufio.Scanner`. The new decoder uses `bufio.Reader` to handle arbitrarily large log entries without truncation. Truncation of logs can lead to partial log entries, which might be acceptable in some cases. However, logs in JSON format may break their validity if they are only partially available. We had also encountered an issue with JSON marshalling due to this. This change addresses that issue. `crdb-v1-zip-upload` is only used internally by debug zip upload operations and not used in general log processing. Jira: CRDB-51108, CRDB-51109 Release note: None Co-authored-by: Arjun Mahishi <[email protected]>
2 parents a036214 + bce7483 commit 64d6a47

File tree

6 files changed

+220
-93
lines changed

6 files changed

+220
-93
lines changed

pkg/cli/debug.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1566,14 +1566,17 @@ func init() {
15661566
"Name of the cluster to associate with the debug zip artifacts. This can be used to identify data in the upstream observability tool.")
15671567
f.Var(&debugZipUploadOpts.from, "from", "oldest timestamp to include (inclusive)")
15681568
f.Var(&debugZipUploadOpts.to, "to", "newest timestamp to include (inclusive)")
1569-
f.StringVar(&debugZipUploadOpts.logFormat, "log-format", "crdb-v1",
1569+
f.StringVar(&debugZipUploadOpts.logFormat, "log-format", "crdb-v1-zip-upload",
15701570
"log format of the input files")
15711571
// the log-format flag is depricated. It will
15721572
// eventually be removed completely. keeping it hidden for now incase we ever
15731573
// need to specify the log format
15741574
f.Lookup("log-format").Hidden = true
15751575
f.StringVar(&debugZipUploadOpts.gcpProjectID, "gcp-project-id",
15761576
defaultGCPProjectID, "GCP project ID to use to send debug.zip logs to GCS")
1577+
// --dry-run is a hidden flag that is only meant to be used for testing and diagnostics
1578+
f.BoolVar(&debugZipUploadOpts.dryRun, "dry-run", false, "run in dry-run mode without making any actual uploads")
1579+
f.Lookup("dry-run").Hidden = true
15771580

15781581
f = debugDecodeKeyCmd.Flags()
15791582
f.Var(&decodeKeyOptions.encoding, "encoding", "key argument encoding")

pkg/cli/zip_upload.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ var debugZipUploadOpts = struct {
124124
from, to timestampValue
125125
logFormat string
126126
maxConcurrentUploads int
127+
dryRun bool
127128
}{
128129
maxConcurrentUploads: system.NumCPU() * 4,
129130
}
@@ -217,6 +218,10 @@ func runDebugZipUpload(cmd *cobra.Command, args []string) error {
217218
artifactsToUpload = debugZipUploadOpts.include
218219
}
219220

221+
if debugZipUploadOpts.dryRun {
222+
fmt.Println("DRY RUN MODE: No actual uploads will be performed")
223+
}
224+
220225
// run the upload functions for each artifact type. This can run sequentially.
221226
// All the concurrency is contained within the upload functions.
222227
for _, artType := range artifactsToUpload {
@@ -238,6 +243,10 @@ func validateZipUploadReadiness() error {
238243
artifactsToUpload = zipArtifactTypes
239244
)
240245

246+
if debugZipUploadOpts.dryRun {
247+
return nil
248+
}
249+
241250
if len(debugZipUploadOpts.include) > 0 {
242251
artifactsToUpload = debugZipUploadOpts.include
243252
}
@@ -474,7 +483,7 @@ func processLogFile(
474483
debugZipUploadOpts.tags..., // user provided tags
475484
), getUploadType(currentTimestamp))
476485
if err != nil {
477-
fmt.Println(err)
486+
fmt.Println("logEntryToJSON:", err)
478487
continue
479488
}
480489

@@ -896,6 +905,11 @@ func startWriterPool(
896905
// writing to GCS. The concurrency has to be handled by the caller.
897906
// This function implements the logUploadFunc signature.
898907
var gcsLogUpload = func(ctx context.Context, sig logUploadSig) (int, error) {
908+
data := bytes.Join(sig.logLines, []byte("\n"))
909+
if debugZipUploadOpts.dryRun {
910+
return len(data), nil
911+
}
912+
899913
gcsClient, closeGCSClient, err := newGCSClient(ctx)
900914
if err != nil {
901915
return 0, err
@@ -910,7 +924,6 @@ var gcsLogUpload = func(ctx context.Context, sig logUploadSig) (int, error) {
910924
retryOpts := base.DefaultRetryOptions()
911925
retryOpts.MaxRetries = zipUploadRetries
912926

913-
data := bytes.Join(sig.logLines, []byte("\n"))
914927
for retry := retry.Start(retryOpts); retry.Next(); {
915928
objectWriter := gcsClient.Bucket(ddArchiveBucketName).Object(filename).NewWriter(ctx)
916929
w := gzip.NewWriter(objectWriter)
@@ -1137,6 +1150,10 @@ func makeDDTag(key, value string) string {
11371150
// There is also some error handling logic in this function. This is a variable so that
11381151
// we can mock this function in the tests.
11391152
var doUploadReq = func(req *http.Request) ([]byte, error) {
1153+
if debugZipUploadOpts.dryRun {
1154+
return []byte("{}"), nil
1155+
}
1156+
11401157
resp, err := http.DefaultClient.Do(req)
11411158
if err != nil {
11421159
return nil, err

pkg/util/log/format_crdb_v1.go

Lines changed: 126 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,29 @@ type entryDecoderV1 struct {
442442
truncatedLastEntry bool
443443
}
444444

445+
// entryDecoderV1ZipUpload is a specialized variant of entryDecoderV1 that uses
446+
// bufio.Reader instead of bufio.Scanner. This implementation is designed for use
447+
// in CLI tools (like debug zip upload) where:
448+
// 1. The 64KB token size limitation of bufio.Scanner needs to be removed
449+
// 2. Memory constraints are less critical for short-lived CLI processes
450+
// 3. Complete data capture is essential for debug tools - no entries should be truncated
451+
type entryDecoderV1ZipUpload struct {
452+
reader *bufio.Reader
453+
sensitiveEditor redactEditor
454+
}
455+
456+
// Decode decodes the next log entry into the provided protobuf message.
457+
func (d *entryDecoderV1ZipUpload) Decode(entry *logpb.Entry) error {
458+
buf, err := d.reader.ReadBytes('\n')
459+
if err == io.EOF && len(buf) == 0 {
460+
return io.EOF
461+
} else if err != nil && !errors.Is(err, bufio.ErrBufferFull) && errors.Is(err, io.EOF) {
462+
return err
463+
}
464+
465+
return parseEntryV1(buf, entry, d.sensitiveEditor)
466+
}
467+
445468
func decodeTimestamp(fragment []byte) (unixNano int64, err error) {
446469
timeFormat := MessageTimeFormat
447470
if len(fragment) > 7 && (fragment[len(fragment)-7] == '+' || fragment[len(fragment)-7] == '-') {
@@ -464,112 +487,127 @@ func (d *entryDecoderV1) Decode(entry *logpb.Entry) error {
464487
}
465488
return io.EOF
466489
}
467-
b := d.scanner.Bytes()
468-
m := entryREV1.FindSubmatch(b)
469-
if m == nil {
470-
continue
471-
}
472-
473-
// Erase all the fields, to be sure.
474-
*entry = logpb.Entry{}
475490

476-
// Process the severity.
477-
entry.Severity = Severity(strings.IndexByte(severityChar, m[1][0]) + 1)
491+
if err := parseEntryV1(d.scanner.Bytes(), entry, d.sensitiveEditor); err != nil {
492+
if errors.Is(err, io.EOF) || errors.Is(err, errNoLogEntry) {
493+
continue
494+
}
478495

479-
// Process the timestamp.
480-
var err error
481-
entry.Time, err = decodeTimestamp(m[2])
482-
if err != nil {
483496
return err
484497
}
498+
return nil
499+
}
500+
}
485501

486-
// Process the goroutine ID.
487-
if len(m[3]) > 0 {
488-
goroutine, err := strconv.Atoi(string(m[3]))
489-
if err != nil {
490-
return err
491-
}
492-
entry.Goroutine = int64(goroutine)
493-
}
502+
var errNoLogEntry = errors.New("no log entry found in buffer")
494503

495-
// Process the channel/file/line details.
496-
entry.File = string(m[4])
497-
if idx := strings.IndexByte(entry.File, '@'); idx != -1 {
498-
ch, err := strconv.Atoi(entry.File[:idx])
499-
if err != nil {
500-
return err
501-
}
502-
entry.Channel = Channel(ch)
503-
entry.File = entry.File[idx+1:]
504-
}
504+
// parseEntryV1 parses a log entry from a byte slice into the provided protobuf message.
505+
// It contains the common parsing logic used by both decoder implementations.
506+
func parseEntryV1(buf []byte, entry *logpb.Entry, sensitiveEditor redactEditor) error {
507+
m := entryREV1.FindSubmatch(buf)
508+
if m == nil {
509+
return errNoLogEntry
510+
}
511+
512+
// Erase all the fields, to be sure.
513+
*entry = logpb.Entry{}
514+
515+
// Process the severity.
516+
entry.Severity = Severity(strings.IndexByte(severityChar, m[1][0]) + 1)
517+
518+
// Process the timestamp.
519+
var err error
520+
entry.Time, err = decodeTimestamp(m[2])
521+
if err != nil {
522+
return err
523+
}
505524

506-
line, err := strconv.Atoi(string(m[5]))
525+
// Process the goroutine ID.
526+
if len(m[3]) > 0 {
527+
goroutine, err := strconv.Atoi(string(m[3]))
507528
if err != nil {
508529
return err
509530
}
510-
entry.Line = int64(line)
511-
512-
// Process the context tags.
513-
redactable := len(m[6]) != 0
514-
// Look for a tenant ID tag. Default to system otherwise.
515-
entry.TenantID = serverident.SystemTenantID
516-
tagsToProcess := m[7]
517-
entry.TenantID, entry.TenantName, tagsToProcess = maybeReadTenantDetails(tagsToProcess)
518-
519-
// Process any remaining tags.
520-
if len(tagsToProcess) != 0 {
521-
r := redactablePackage{
522-
msg: tagsToProcess,
523-
redactable: redactable,
524-
}
525-
r = d.sensitiveEditor(r)
526-
entry.Tags = string(r.msg)
527-
}
531+
entry.Goroutine = int64(goroutine)
532+
}
528533

529-
// If there's an entry counter at the start of the message, process it.
530-
msg := b[len(m[0]):]
531-
i := 0
532-
for ; i < len(msg) && msg[i] >= '0' && msg[i] <= '9'; i++ {
533-
entry.Counter = entry.Counter*10 + uint64(msg[i]-'0')
534-
}
535-
if i > 0 && i < len(msg) && msg[i] == ' ' {
536-
// Only accept the entry counter if followed by a space. In all
537-
// other cases, the number was part of the message string.
538-
msg = msg[i+1:]
539-
} else {
540-
// This was not truly an entry counter. Ignore the work done previously.
541-
entry.Counter = 0
534+
// Process the channel/file/line details.
535+
entry.File = string(m[4])
536+
if idx := strings.IndexByte(entry.File, '@'); idx != -1 {
537+
ch, err := strconv.Atoi(entry.File[:idx])
538+
if err != nil {
539+
return err
542540
}
541+
entry.Channel = Channel(ch)
542+
entry.File = entry.File[idx+1:]
543+
}
544+
545+
line, err := strconv.Atoi(string(m[5]))
546+
if err != nil {
547+
return err
548+
}
549+
entry.Line = int64(line)
550+
551+
// Process the context tags.
552+
redactable := len(m[6]) != 0
553+
// Look for a tenant ID tag. Default to system otherwise.
554+
entry.TenantID = serverident.SystemTenantID
555+
tagsToProcess := m[7]
556+
entry.TenantID, entry.TenantName, tagsToProcess = maybeReadTenantDetails(tagsToProcess)
543557

544-
// Process the remainder of the log message.
558+
// Process any remaining tags.
559+
if len(tagsToProcess) != 0 {
545560
r := redactablePackage{
546-
msg: trimFinalNewLines(msg),
561+
msg: tagsToProcess,
547562
redactable: redactable,
548563
}
549-
r = d.sensitiveEditor(r)
550-
entry.Message = string(r.msg)
551-
entry.Redactable = r.redactable
552-
553-
if strings.HasPrefix(entry.Message, structuredEntryPrefix+"{") /* crdb-v1 prefix */ {
554-
// Note: we do not recognize the v2 marker here (" ={") because
555-
// v2 entries can be split across multiple lines.
556-
entry.StructuredStart = uint32(len(structuredEntryPrefix))
557-
558-
if nl := strings.IndexByte(entry.Message, '\n'); nl != -1 {
559-
entry.StructuredEnd = uint32(nl)
560-
entry.StackTraceStart = uint32(nl + 1)
561-
} else {
562-
entry.StructuredEnd = uint32(len(entry.Message))
563-
}
564-
}
565-
// Note: we only know how to populate entry.StackTraceStart upon
566-
// parse if the entry was structured (see above). If it is not
567-
// structured, we cannot distinguish where the message ends and
568-
// where the stack trace starts. This is another reason why the
569-
// crdb-v1 format is lossy.
564+
r = sensitiveEditor(r)
565+
entry.Tags = string(r.msg)
566+
}
570567

571-
return nil
568+
// If there's an entry counter at the start of the message, process it.
569+
msg := buf[len(m[0]):]
570+
i := 0
571+
for ; i < len(msg) && msg[i] >= '0' && msg[i] <= '9'; i++ {
572+
entry.Counter = entry.Counter*10 + uint64(msg[i]-'0')
573+
}
574+
if i > 0 && i < len(msg) && msg[i] == ' ' {
575+
// Only accept the entry counter if followed by a space. In all
576+
// other cases, the number was part of the message string.
577+
msg = msg[i+1:]
578+
} else {
579+
// This was not truly an entry counter. Ignore the work done previously.
580+
entry.Counter = 0
572581
}
582+
583+
// Process the remainder of the log message.
584+
r := redactablePackage{
585+
msg: trimFinalNewLines(msg),
586+
redactable: redactable,
587+
}
588+
r = sensitiveEditor(r)
589+
entry.Message = string(r.msg)
590+
entry.Redactable = r.redactable
591+
592+
if strings.HasPrefix(entry.Message, structuredEntryPrefix+"{") /* crdb-v1 prefix */ {
593+
// Note: we do not recognize the v2 marker here (" ={") because
594+
// v2 entries can be split across multiple lines.
595+
entry.StructuredStart = uint32(len(structuredEntryPrefix))
596+
597+
if nl := strings.IndexByte(entry.Message, '\n'); nl != -1 {
598+
entry.StructuredEnd = uint32(nl)
599+
entry.StackTraceStart = uint32(nl + 1)
600+
} else {
601+
entry.StructuredEnd = uint32(len(entry.Message))
602+
}
603+
}
604+
// Note: we only know how to populate entry.StackTraceStart upon
605+
// parse if the entry was structured (see above). If it is not
606+
// structured, we cannot distinguish where the message ends and
607+
// where the stack trace starts. This is another reason why the
608+
// crdb-v1 format is lossy.
609+
610+
return nil
573611
}
574612

575613
// maybeReadTenantDetails reads the tenant ID and name. If neither the

0 commit comments

Comments
 (0)