Skip to content

Commit 8fc74f2

Browse files
committed
sql: collect cluster-wide traces as part of job execution details
This change includes the collection of cluster-wide, active tracing spans to a job's execution details. The traces are stored in a zip with a text and jaegar json file per node, that contain the active tracing spans associated with the current execution of the job. Once cockroachdb#106879 and cockroachdb#107210 merge a trace.zip will be generated everytime a user requests new execution details from the job details page. This zip will be downloadable from the job details page itself. Informs: cockroachdb#102794 Release note: None
1 parent 4618dbf commit 8fc74f2

File tree

4 files changed

+138
-6
lines changed

4 files changed

+138
-6
lines changed

pkg/jobs/jobs.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3939
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4040
"github.com/cockroachdb/cockroach/pkg/util/tracing"
41+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
4142
"github.com/cockroachdb/errors"
4243
"github.com/cockroachdb/redact"
4344
"github.com/gogo/protobuf/jsonpb"
@@ -1121,3 +1122,28 @@ func FormatRetriableExecutionErrorLogToStringArray(
11211122
}
11221123
return arr
11231124
}
1125+
1126+
// GetJobTraceID returns the current trace ID of the job from the job progress.
1127+
func GetJobTraceID(ctx context.Context, db isql.DB, jobID jobspb.JobID) (tracingpb.TraceID, error) {
1128+
var traceID tracingpb.TraceID
1129+
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1130+
jobInfo := InfoStorageForJob(txn, jobID)
1131+
progressBytes, exists, err := jobInfo.GetLegacyProgress(ctx)
1132+
if err != nil {
1133+
return err
1134+
}
1135+
if !exists {
1136+
return errors.New("progress not found")
1137+
}
1138+
var progress jobspb.Progress
1139+
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
1140+
return errors.Wrap(err, "failed to unmarshal progress bytes")
1141+
}
1142+
traceID = progress.TraceID
1143+
return nil
1144+
}); err != nil {
1145+
return 0, errors.Wrapf(err, "failed to fetch trace ID for job %d", jobID)
1146+
}
1147+
1148+
return traceID, nil
1149+
}

pkg/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ go_library(
566566
"//pkg/util/tracing",
567567
"//pkg/util/tracing/collector",
568568
"//pkg/util/tracing/tracingpb",
569+
"//pkg/util/tracing/zipper",
569570
"//pkg/util/tsearch",
570571
"//pkg/util/uint128",
571572
"//pkg/util/uuid",
@@ -904,6 +905,7 @@ go_test(
904905
"@com_github_jackc_pgconn//:pgconn",
905906
"@com_github_jackc_pgtype//:pgtype",
906907
"@com_github_jackc_pgx_v4//:pgx",
908+
"@com_github_klauspost_compress//zip",
907909
"@com_github_lib_pq//:pq",
908910
"@com_github_lib_pq//oid",
909911
"@com_github_petermattis_goid//:goid",

pkg/sql/jobs_profiler_execution_details.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
3131
"github.com/cockroachdb/cockroach/pkg/util/log"
3232
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
33+
"github.com/cockroachdb/cockroach/pkg/util/tracing/zipper"
3334
"github.com/cockroachdb/errors"
3435
)
3536

@@ -196,6 +197,7 @@ func (p *planner) RequestExecutionDetailFiles(ctx context.Context, jobID jobspb.
196197
// parallelize the collection of the various pieces.
197198
e.addDistSQLDiagram(ctx)
198199
e.addLabelledGoroutines(ctx)
200+
e.addClusterWideTraces(ctx)
199201

200202
return nil
201203
}
@@ -230,12 +232,12 @@ func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) {
230232
}
231233
resp, err := e.srv.Profile(ctx, &profileRequest)
232234
if err != nil {
233-
log.Errorf(ctx, "failed to collect goroutines for job %d: %+v", e.jobID, err.Error())
235+
log.Errorf(ctx, "failed to collect goroutines for job %d: %v", e.jobID, err.Error())
234236
return
235237
}
236238
filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00"))
237239
if err := jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, e.db, e.jobID); err != nil {
238-
log.Errorf(ctx, "failed to write goroutine for job %d: %+v", e.jobID, err.Error())
240+
log.Errorf(ctx, "failed to write goroutine for job %d: %v", e.jobID, err.Error())
239241
}
240242
}
241243

@@ -245,7 +247,7 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
245247
row, err := e.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */
246248
sessiondata.NoSessionDataOverride, query, e.jobID)
247249
if err != nil {
248-
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error())
250+
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error())
249251
return
250252
}
251253
if row != nil && row[0] != tree.DNull {
@@ -254,7 +256,29 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
254256
if err := jobs.WriteExecutionDetailFile(ctx, filename,
255257
[]byte(fmt.Sprintf(`<meta http-equiv="Refresh" content="0; url=%s">`, dspDiagramURL)),
256258
e.db, e.jobID); err != nil {
257-
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error())
259+
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error())
258260
}
259261
}
260262
}
263+
264+
// addClusterWideTraces generates and persists a `trace.<timestamp>.zip` file
265+
// that captures the active tracing spans of a job on all nodes in the cluster.
266+
func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) {
267+
z := zipper.MakeInternalExecutorInflightTraceZipper(e.db.Executor())
268+
269+
traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID)
270+
if err != nil {
271+
log.Warningf(ctx, "failed to fetch job trace ID: %+v", err.Error())
272+
return
273+
}
274+
zippedTrace, err := z.Zip(ctx, int64(traceID))
275+
if err != nil {
276+
log.Errorf(ctx, "failed to collect cluster wide traces for job %d: %v", e.jobID, err.Error())
277+
return
278+
}
279+
280+
filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00"))
281+
if err := jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, e.db, e.jobID); err != nil {
282+
log.Errorf(ctx, "failed to write traces for job %d: %v", e.jobID, err.Error())
283+
}
284+
}

pkg/sql/jobs_profiler_execution_details_test.go

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,11 @@ import (
4242
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
4343
"github.com/cockroachdb/cockroach/pkg/util/log"
4444
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
45+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
4546
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4647
"github.com/cockroachdb/errors"
48+
"github.com/gogo/protobuf/types"
49+
"github.com/klauspost/compress/zip"
4750
"github.com/stretchr/testify/require"
4851
)
4952

@@ -224,6 +227,80 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) {
224227
t.Run("execution details for invalid job ID", func(t *testing.T) {
225228
runner.ExpectErr(t, `job -123 not found; cannot request execution details`, `SELECT crdb_internal.request_job_execution_details(-123)`)
226229
})
230+
231+
t.Run("read/write terminal trace", func(t *testing.T) {
232+
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
233+
return fakeExecResumer{
234+
OnResume: func(ctx context.Context) error {
235+
sp := tracing.SpanFromContext(ctx)
236+
require.NotNil(t, sp)
237+
sp.RecordStructured(&types.StringValue{Value: "should see this"})
238+
return nil
239+
},
240+
}
241+
}, jobs.UsesTenantCostControl)
242+
var importJobID int
243+
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
244+
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
245+
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
246+
trace := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "resumer-trace")
247+
require.Contains(t, string(trace), "should see this")
248+
})
249+
250+
t.Run("read/write active trace", func(t *testing.T) {
251+
blockCh := make(chan struct{})
252+
continueCh := make(chan struct{})
253+
defer close(blockCh)
254+
defer close(continueCh)
255+
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
256+
return fakeExecResumer{
257+
OnResume: func(ctx context.Context) error {
258+
_, childSp := tracing.ChildSpan(ctx, "child")
259+
defer childSp.Finish()
260+
blockCh <- struct{}{}
261+
<-continueCh
262+
return nil
263+
},
264+
}
265+
}, jobs.UsesTenantCostControl)
266+
var importJobID int
267+
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
268+
<-blockCh
269+
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
270+
activeTraces := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "trace")
271+
continueCh <- struct{}{}
272+
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
273+
unzip, err := zip.NewReader(bytes.NewReader(activeTraces), int64(len(activeTraces)))
274+
require.NoError(t, err)
275+
276+
// Make sure the bundle contains the expected list of files.
277+
var files []string
278+
for _, f := range unzip.File {
279+
if f.UncompressedSize64 == 0 {
280+
t.Fatalf("file %s is empty", f.Name)
281+
}
282+
files = append(files, f.Name)
283+
284+
r, err := f.Open()
285+
if err != nil {
286+
t.Fatal(err)
287+
}
288+
defer r.Close()
289+
bytes, err := io.ReadAll(r)
290+
if err != nil {
291+
t.Fatal(err)
292+
}
293+
contents := string(bytes)
294+
295+
// Verify some contents in the active traces.
296+
if strings.Contains(f.Name, ".txt") {
297+
require.Regexp(t, "[child: {count: 1, duration.*, unfinished}]", contents)
298+
} else if strings.Contains(f.Name, ".json") {
299+
require.True(t, strings.Contains(contents, "\"operationName\": \"child\""))
300+
}
301+
}
302+
require.Equal(t, []string{"node1-trace.txt", "node1-jaeger.json"}, files)
303+
})
227304
}
228305

229306
func TestListProfilerExecutionDetails(t *testing.T) {
@@ -272,10 +349,11 @@ func TestListProfilerExecutionDetails(t *testing.T) {
272349

273350
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
274351
files := listExecutionDetails(t, s, jobspb.JobID(importJobID))
275-
require.Len(t, files, 3)
352+
require.Len(t, files, 4)
276353
require.Regexp(t, "distsql\\..*\\.html", files[0])
277354
require.Regexp(t, "goroutines\\..*\\.txt", files[1])
278355
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[2])
356+
require.Regexp(t, "trace\\..*\\.zip", files[3])
279357

280358
// Resume the job, so it can write another DistSQL diagram and goroutine
281359
// snapshot.
@@ -285,13 +363,15 @@ func TestListProfilerExecutionDetails(t *testing.T) {
285363
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
286364
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
287365
files = listExecutionDetails(t, s, jobspb.JobID(importJobID))
288-
require.Len(t, files, 6)
366+
require.Len(t, files, 8)
289367
require.Regexp(t, "distsql\\..*\\.html", files[0])
290368
require.Regexp(t, "distsql\\..*\\.html", files[1])
291369
require.Regexp(t, "goroutines\\..*\\.txt", files[2])
292370
require.Regexp(t, "goroutines\\..*\\.txt", files[3])
293371
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[4])
294372
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[5])
373+
require.Regexp(t, "trace\\..*\\.zip", files[6])
374+
require.Regexp(t, "trace\\..*\\.zip", files[7])
295375
})
296376
}
297377

0 commit comments

Comments
 (0)