Skip to content

Commit 4413ec7

Browse files
craig[bot]adityamaruknz
committed
107198: jobsprofiler: stringify protobin files when requested r=dt a=adityamaru This change is in preparation for a larger change that will allow downloading debug files from the `Advanded Debugging` tab on the job details page. With this change a `binpb` file will have a `binpb.txt` version of the file listed too. If the user requests to download a `binpb.txt` file we unmarshal and stringify the contents of the file before serving them to the user. Currently, there is only one protobin file type written by a job resumer on completion. Informs: cockroachdb#105076 Release note: None 107700: netutil: fix a buglet r=erikgrinaker,stevendanna a=knz I was noticing an excess number of conn objects remaining open after a test shutdown. Release note: None Epic: CRDB-28893 107711: backupccl: skip TestBackupRestoreTenant r=stevendanna a=adityamaru Skip while we debug the timeouts in cockroachdb#107669. Informs: cockroachdb#107669 Release note: None Co-authored-by: adityamaru <[email protected]> Co-authored-by: Raphael 'kena' Poss <[email protected]>
4 parents 3bbf620 + b4362a6 + 973d861 + 3e87abe commit 4413ec7

File tree

8 files changed

+101
-59
lines changed

8 files changed

+101
-59
lines changed

pkg/ccl/backupccl/backup_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6943,6 +6943,7 @@ func TestBackupRestoreInsideMultiPodTenant(t *testing.T) {
69436943
func TestBackupRestoreTenant(t *testing.T) {
69446944
defer leaktest.AfterTest(t)()
69456945
defer log.Scope(t).Close(t)
6946+
skip.WithIssue(t, 107669)
69466947

69476948
params := base.TestClusterArgs{ServerArgs: base.TestServerArgs{
69486949
Knobs: base.TestingKnobs{

pkg/jobs/adopt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j
8282
return
8383
}
8484

85-
resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.txt",
85+
resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.binpb",
8686
r.ID().String(), timeutil.Now().Format("20060102_150405.00"))
8787
td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()}
8888
b, err := protoutil.Marshal(&td)

pkg/jobs/execution_detail_utils.go

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2121
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants"
2222
"github.com/cockroachdb/cockroach/pkg/sql/isql"
23+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
24+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
2325
"github.com/cockroachdb/errors"
2426
"github.com/klauspost/compress/gzip"
2527
)
@@ -82,6 +84,19 @@ func WriteExecutionDetailFile(
8284
})
8385
}
8486

87+
func stringifyProtobinFile(filename string, fileContents *bytes.Buffer) ([]byte, error) {
88+
if strings.HasPrefix(filename, "resumer-trace") {
89+
td := &jobspb.TraceData{}
90+
if err := protoutil.Unmarshal(fileContents.Bytes(), td); err != nil {
91+
return nil, err
92+
}
93+
rec := tracingpb.Recording(td.CollectedSpans)
94+
return []byte(rec.String()), nil
95+
} else {
96+
return nil, errors.AssertionFailedf("unknown file %s", filename)
97+
}
98+
}
99+
85100
// ReadExecutionDetailFile will stitch together all the chunks corresponding to the
86101
// filename and return the uncompressed data of the file.
87102
func ReadExecutionDetailFile(
@@ -91,37 +106,52 @@ func ReadExecutionDetailFile(
91106
// to the job's execution details and return the zipped bundle instead.
92107

93108
buf := bytes.NewBuffer([]byte{})
94-
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
95-
// Reset the buf inside the txn closure to guard against txn retries.
96-
buf.Reset()
97-
jobInfo := InfoStorageForJob(txn, jobID)
109+
fetchFileContent := func(file string) error {
110+
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
111+
// Reset the buf inside the txn closure to guard against txn retries.
112+
buf.Reset()
113+
jobInfo := InfoStorageForJob(txn, jobID)
114+
115+
// Iterate over all the chunks of the requested file and return the unzipped
116+
// chunks of data.
117+
var lastInfoKey string
118+
if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(file),
119+
func(infoKey string, value []byte) error {
120+
lastInfoKey = infoKey
121+
r, err := gzip.NewReader(bytes.NewBuffer(value))
122+
if err != nil {
123+
return err
124+
}
125+
decompressed, err := io.ReadAll(r)
126+
if err != nil {
127+
return err
128+
}
129+
buf.Write(decompressed)
130+
return nil
131+
}); err != nil {
132+
return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
133+
}
98134

99-
// Iterate over all the chunks of the requested file and return the unzipped
100-
// chunks of data.
101-
var lastInfoKey string
102-
if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename),
103-
func(infoKey string, value []byte) error {
104-
lastInfoKey = infoKey
105-
r, err := gzip.NewReader(bytes.NewBuffer(value))
106-
if err != nil {
107-
return err
108-
}
109-
decompressed, err := io.ReadAll(r)
110-
if err != nil {
111-
return err
112-
}
113-
buf.Write(decompressed)
114-
return nil
115-
}); err != nil {
116-
return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
117-
}
135+
if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) {
136+
return errors.Newf("failed to read all chunks for file %s, last info key read was %s", file, lastInfoKey)
137+
}
118138

119-
if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) {
120-
return errors.Newf("failed to read all chunks for file %s, last info key read was %s", filename, lastInfoKey)
139+
return nil
140+
})
141+
}
142+
143+
// If the file requested is the `binpb.txt` format of a `binpb` file, we must
144+
// fetch the `binpb` version of the file and stringify the contents before
145+
// returning the response.
146+
if strings.HasSuffix(filename, "binpb.txt") {
147+
trimmedFilename := strings.TrimSuffix(filename, ".txt")
148+
if err := fetchFileContent(trimmedFilename); err != nil {
149+
return nil, err
121150
}
151+
return stringifyProtobinFile(filename, buf)
152+
}
122153

123-
return nil
124-
}); err != nil {
154+
if err := fetchFileContent(filename); err != nil {
125155
return nil, err
126156
}
127157
return buf.Bytes(), nil
@@ -143,7 +173,13 @@ func ListExecutionDetailFiles(
143173
func(infoKey string, value []byte) error {
144174
// Look for the final chunk of each file to find the unique file name.
145175
if strings.HasSuffix(infoKey, finalChunkSuffix) {
146-
files = append(files, strings.TrimPrefix(strings.TrimSuffix(infoKey, finalChunkSuffix), profilerconstants.ExecutionDetailsChunkKeyPrefix))
176+
filename := strings.TrimPrefix(strings.TrimSuffix(infoKey, finalChunkSuffix), profilerconstants.ExecutionDetailsChunkKeyPrefix)
177+
// If we see a `.binpb` file we also want to make the string version of
178+
// the file available for consumption.
179+
if strings.HasSuffix(filename, ".binpb") {
180+
files = append(files, filename+".txt")
181+
}
182+
files = append(files, filename)
147183
}
148184
return nil
149185
}); err != nil {

pkg/jobs/jobs_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,9 +1064,9 @@ func TestRegistryLifecycle(t *testing.T) {
10641064
rts.check(t, jobs.StatusSucceeded)
10651065

10661066
<-completeCh
1067-
checkTraceFiles(ctx, t, expectedNumFiles+1, j.ID(), rts.s)
1067+
checkTraceFiles(ctx, t, expectedNumFiles+2, j.ID(), rts.s)
10681068
}
1069-
pauseUnpauseJob(1)
1069+
pauseUnpauseJob(2)
10701070
})
10711071

10721072
t.Run("dump traces on fail", func(t *testing.T) {
@@ -1103,7 +1103,7 @@ func TestRegistryLifecycle(t *testing.T) {
11031103
checkTraceFiles(ctx, t, expectedNumFiles, j.ID(), rts.s)
11041104
}
11051105

1106-
runJobAndFail(1)
1106+
runJobAndFail(2)
11071107
})
11081108

11091109
t.Run("dump traces on cancel", func(t *testing.T) {
@@ -1126,7 +1126,7 @@ func TestRegistryLifecycle(t *testing.T) {
11261126
rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())
11271127

11281128
<-completeCh
1129-
checkTraceFiles(rts.ctx, t, 1, j.ID(), rts.s)
1129+
checkTraceFiles(rts.ctx, t, 2, j.ID(), rts.s)
11301130

11311131
rts.mu.e.OnFailOrCancelStart = true
11321132
rts.check(t, jobs.StatusReverting)
@@ -1189,17 +1189,23 @@ func checkTraceFiles(
11891189
) {
11901190
t.Helper()
11911191

1192-
recordings := make([]jobspb.TraceData, 0)
1192+
recordings := make([][]byte, 0)
11931193
execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig)
11941194
edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID)
11951195
require.NoError(t, err)
1196+
require.Len(t, edFiles, expectedNumFiles)
11961197

11971198
for _, f := range edFiles {
11981199
data, err := jobs.ReadExecutionDetailFile(ctx, f, execCfg.InternalDB, jobID)
11991200
require.NoError(t, err)
1200-
td := jobspb.TraceData{}
1201-
require.NoError(t, protoutil.Unmarshal(data, &td))
1202-
recordings = append(recordings, td)
1201+
// Trace files are dumped in `binpb` and `binpb.txt` format. The former
1202+
// should be unmarshal-able.
1203+
if strings.HasSuffix(f, "binpb") {
1204+
td := jobspb.TraceData{}
1205+
require.NoError(t, protoutil.Unmarshal(data, &td))
1206+
require.NotEmpty(t, td.CollectedSpans)
1207+
}
1208+
recordings = append(recordings, data)
12031209
}
12041210
if len(recordings) != expectedNumFiles {
12051211
t.Fatalf("expected %d entries but found %d", expectedNumFiles, len(recordings))

pkg/jobs/jobsprofiler/profiler_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) {
214214
// At this point there should have been two resumers, and so we expect two
215215
// trace recordings.
216216
testutils.SucceedsSoon(t, func() error {
217-
recordings := make([]jobspb.TraceData, 0)
217+
recordings := make([][]byte, 0)
218218
execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig)
219219
edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID))
220220
if err != nil {
@@ -232,13 +232,16 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) {
232232
if err != nil {
233233
return err
234234
}
235-
td := jobspb.TraceData{}
236-
if err := protoutil.Unmarshal(data, &td); err != nil {
237-
return err
235+
recordings = append(recordings, data)
236+
if strings.HasSuffix(f, "binpb") {
237+
td := jobspb.TraceData{}
238+
if err := protoutil.Unmarshal(data, &td); err != nil {
239+
return err
240+
}
241+
require.NotEmpty(t, td.CollectedSpans)
238242
}
239-
recordings = append(recordings, td)
240243
}
241-
if len(recordings) != 2 {
244+
if len(recordings) != 4 {
242245
return errors.Newf("expected 2 entries but found %d", len(recordings))
243246
}
244247
return nil

pkg/jobs/jobsprofiler/profilerconstants/constants.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,6 @@ func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processo
3737
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID)
3838
}
3939

40-
const ResumerTraceInfoKeyPrefix = "~resumer-trace-"
41-
42-
// MakeResumerTraceInfoKey returns the info_key used for rows that store the
43-
// traces on completion of a resumer's execution.
44-
func MakeResumerTraceInfoKey(traceID uint64, nodeID string) string {
45-
return fmt.Sprintf("%s%d-%s", ResumerTraceInfoKeyPrefix, traceID, nodeID)
46-
}
47-
4840
// ExecutionDetailsChunkKeyPrefix is the prefix of the info key used for rows that
4941
// store chunks of a job's execution details.
5042
const ExecutionDetailsChunkKeyPrefix = "~profiler/"

pkg/sql/jobs_profiler_execution_details_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,12 @@ func TestListProfilerExecutionDetails(t *testing.T) {
349349

350350
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
351351
files := listExecutionDetails(t, s, jobspb.JobID(importJobID))
352-
require.Len(t, files, 4)
352+
require.Len(t, files, 5)
353353
require.Regexp(t, "distsql\\..*\\.html", files[0])
354354
require.Regexp(t, "goroutines\\..*\\.txt", files[1])
355-
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[2])
356-
require.Regexp(t, "trace\\..*\\.zip", files[3])
355+
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[2])
356+
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[3])
357+
require.Regexp(t, "trace\\..*\\.zip", files[4])
357358

358359
// Resume the job, so it can write another DistSQL diagram and goroutine
359360
// snapshot.
@@ -363,15 +364,17 @@ func TestListProfilerExecutionDetails(t *testing.T) {
363364
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
364365
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
365366
files = listExecutionDetails(t, s, jobspb.JobID(importJobID))
366-
require.Len(t, files, 8)
367+
require.Len(t, files, 10)
367368
require.Regexp(t, "distsql\\..*\\.html", files[0])
368369
require.Regexp(t, "distsql\\..*\\.html", files[1])
369370
require.Regexp(t, "goroutines\\..*\\.txt", files[2])
370371
require.Regexp(t, "goroutines\\..*\\.txt", files[3])
371-
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[4])
372-
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[5])
373-
require.Regexp(t, "trace\\..*\\.zip", files[6])
374-
require.Regexp(t, "trace\\..*\\.zip", files[7])
372+
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[4])
373+
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[5])
374+
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[6])
375+
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[7])
376+
require.Regexp(t, "trace\\..*\\.zip", files[8])
377+
require.Regexp(t, "trace\\..*\\.zip", files[9])
375378
})
376379
}
377380

pkg/util/netutil/net.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ func (s *TCPServer) ServeWith(
191191
serveConn(ctx, rw)
192192
})
193193
if err != nil {
194+
err = errors.CombineErrors(err, rw.Close())
194195
return err
195196
}
196197
}

0 commit comments

Comments
 (0)