Skip to content

Commit c403d35

Browse files
craig[bot]adityamaru
andcommitted
106654: sql,server: support collecting labelled goroutines in the job profiler r=dt a=adityamaru This change collect cluster-wide goroutines that have a pprof label tying it to the particular job's execution, whose job execution details have been requested. This relies on the support added to the pprofui server to collect cluster-wide, labelled goroutines in cockroachdb#105916. Informs: cockroachdb#105076 Release note: None Co-authored-by: adityamaru <[email protected]>
2 parents 269b9e3 + 8770dce commit c403d35

File tree

3 files changed

+74
-19
lines changed

3 files changed

+74
-19
lines changed

pkg/server/status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4139,7 +4139,7 @@ func (s *statusServer) GetJobProfilerExecutionDetails(
41394139

41404140
jobID := jobspb.JobID(req.JobId)
41414141
execCfg := s.sqlServer.execCfg
4142-
eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID)
4142+
eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID)
41434143
data, err := eb.ReadExecutionDetail(ctx, req.Filename)
41444144
if err != nil {
41454145
return nil, err

pkg/sql/jobs_profiler_bundle.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/jobs"
2222
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2323
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants"
24+
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
2425
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2526
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2627
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
@@ -40,17 +41,19 @@ func (p *planner) RequestExecutionDetails(ctx context.Context, jobID jobspb.JobI
4041
clusterversion.V23_1.String())
4142
}
4243

43-
e := MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID)
44+
e := MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID)
4445
// TODO(adityamaru): When we start collecting more information we can consider
4546
// parallelize the collection of the various pieces.
4647
e.addDistSQLDiagram(ctx)
48+
e.addLabelledGoroutines(ctx)
4749

4850
return nil
4951
}
5052

5153
// ExecutionDetailsBuilder can be used to read and write execution details corresponding
5254
// to a job.
5355
type ExecutionDetailsBuilder struct {
56+
srv serverpb.SQLStatusServer
5457
db isql.DB
5558
jobID jobspb.JobID
5659
}
@@ -156,14 +159,35 @@ func (e *ExecutionDetailsBuilder) ReadExecutionDetail(
156159

157160
// MakeJobProfilerExecutionDetailsBuilder returns an instance of an ExecutionDetailsBuilder.
158161
func MakeJobProfilerExecutionDetailsBuilder(
159-
db isql.DB, jobID jobspb.JobID,
162+
srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID,
160163
) ExecutionDetailsBuilder {
161164
e := ExecutionDetailsBuilder{
162-
db: db, jobID: jobID,
165+
srv: srv, db: db, jobID: jobID,
163166
}
164167
return e
165168
}
166169

170+
// addLabelledGoroutines collects and persists goroutines from all nodes in the
171+
// cluster that have a pprof label tying it to the job whose execution details
172+
// are being collected.
173+
func (e *ExecutionDetailsBuilder) addLabelledGoroutines(ctx context.Context) {
174+
profileRequest := serverpb.ProfileRequest{
175+
NodeId: "all",
176+
Type: serverpb.ProfileRequest_GOROUTINE,
177+
Labels: true,
178+
LabelFilter: fmt.Sprintf("%d", e.jobID),
179+
}
180+
resp, err := e.srv.Profile(ctx, &profileRequest)
181+
if err != nil {
182+
log.Errorf(ctx, "failed to collect goroutines for job %d: %+v", e.jobID, err.Error())
183+
return
184+
}
185+
filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00"))
186+
if err := e.WriteExecutionDetail(ctx, filename, resp.Data); err != nil {
187+
log.Errorf(ctx, "failed to write goroutine for job %d: %+v", e.jobID, err.Error())
188+
}
189+
}
190+
167191
// addDistSQLDiagram generates and persists a `distsql.<timestamp>.html` file.
168192
func (e *ExecutionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
169193
query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]`

pkg/sql/jobs_profiler_bundle_test.go

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"fmt"
1717
"io"
1818
"net/http"
19+
"runtime/pprof"
20+
"strings"
1921
"testing"
2022
"time"
2123

@@ -59,35 +61,63 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) {
5961

6062
runner := sqlutils.MakeSQLRunner(sqlDB)
6163

62-
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
63-
return fakeExecResumer{
64-
OnResume: func(ctx context.Context) error {
65-
p := sql.PhysicalPlan{}
66-
infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1))
67-
p.PhysicalInfrastructure = infra
68-
jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID())
69-
checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID())
70-
return nil
71-
},
72-
}
73-
}, jobs.UsesTenantCostControl)
74-
7564
runner.Exec(t, `CREATE TABLE t (id INT)`)
7665
runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`)
7766

7867
t.Run("read/write DistSQL diagram", func(t *testing.T) {
68+
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
69+
return fakeExecResumer{
70+
OnResume: func(ctx context.Context) error {
71+
p := sql.PhysicalPlan{}
72+
infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1))
73+
p.PhysicalInfrastructure = infra
74+
jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID())
75+
checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID())
76+
return nil
77+
},
78+
}
79+
}, jobs.UsesTenantCostControl)
80+
7981
var importJobID int
8082
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
8183
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
8284

8385
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
84-
checkExecutionDetails(t, s, jobspb.JobID(importJobID), "distsql")
86+
distSQLDiagram := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "distsql")
87+
require.Regexp(t, "<meta http-equiv=\"Refresh\" content=\"0\\; url=https://cockroachdb\\.github\\.io/distsqlplan/decode.html.*>", string(distSQLDiagram))
88+
})
89+
90+
t.Run("read/write goroutines", func(t *testing.T) {
91+
blockCh := make(chan struct{})
92+
continueCh := make(chan struct{})
93+
defer close(blockCh)
94+
defer close(continueCh)
95+
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
96+
return fakeExecResumer{
97+
OnResume: func(ctx context.Context) error {
98+
pprof.Do(ctx, pprof.Labels("foo", "bar"), func(ctx2 context.Context) {
99+
blockCh <- struct{}{}
100+
<-continueCh
101+
})
102+
return nil
103+
},
104+
}
105+
}, jobs.UsesTenantCostControl)
106+
var importJobID int
107+
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
108+
<-blockCh
109+
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
110+
goroutines := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "goroutines")
111+
continueCh <- struct{}{}
112+
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
113+
require.True(t, strings.Contains(string(goroutines), fmt.Sprintf("labels: {\"foo\":\"bar\", \"job\":\"IMPORT id=%d\", \"n\":\"1\"}", importJobID)))
114+
require.True(t, strings.Contains(string(goroutines), "github.com/cockroachdb/cockroach/pkg/sql_test.fakeExecResumer.Resume"))
85115
})
86116
}
87117

88118
func checkExecutionDetails(
89119
t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, filename string,
90-
) {
120+
) []byte {
91121
t.Helper()
92122

93123
client, err := s.GetAdminHTTPClient()
@@ -112,4 +142,5 @@ func checkExecutionDetails(
112142
data, err := io.ReadAll(r)
113143
require.NoError(t, err)
114144
require.NotEmpty(t, data)
145+
return data
115146
}

0 commit comments

Comments
 (0)