Skip to content

Commit 737e32c

Browse files
structlogging: properly register hot ranges logging job
There's a test failure in the hot range logging job which indicates that there's a race condition in how the job's resumer is registered. After some discussion, this PR sets up job registration to be more in line with how the job API is meant to be interfaced with. Fixes: #149041 Epic: None Release note: none
1 parent ba6e42f commit 737e32c

File tree

3 files changed

+54
-39
lines changed

3 files changed

+54
-39
lines changed

pkg/server/structlogging/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"//pkg/server/serverpb",
1616
"//pkg/settings",
1717
"//pkg/settings/cluster",
18+
"//pkg/sql",
1819
"//pkg/util/log",
1920
"//pkg/util/log/eventpb",
2021
"//pkg/util/log/logpb",

pkg/server/structlogging/hot_ranges_log.go

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"context"
1010
"time"
1111

12-
"github.com/cockroachdb/cockroach/pkg/jobs"
13-
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1412
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
1513
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
1614
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -73,13 +71,11 @@ type HotRangeGetter interface {
7371
HotRangesV2(ctx context.Context, req *serverpb.HotRangesRequest) (*serverpb.HotRangesResponseV2, error)
7472
}
7573

76-
// hotRangesLoggingScheduler is responsible for logging index usage stats
74+
// hotRangesLogger is responsible for logging index usage stats
7775
// on a scheduled interval.
78-
type hotRangesLoggingScheduler struct {
76+
type hotRangesLogger struct {
7977
sServer HotRangeGetter
8078
st *cluster.Settings
81-
stopper *stop.Stopper
82-
job *jobs.Job
8379
multiTenant bool
8480
lastLogged time.Time
8581
}
@@ -90,8 +86,8 @@ type hotRangesLoggingScheduler struct {
9086
// For system tenants, or single tenant deployments, it runs as
9187
// a task on each node, logging only the ranges on the node in
9288
// which it runs. For app tenants in a multi-tenant deployment,
93-
// it runs on a single node in the sql cluster, applying a fanout
94-
// to the kv layer to collect the hot ranges from all nodes.
89+
// it does nothing, allowing the hot range logging job to be the
90+
// entrypoint.
9591
func StartHotRangesLoggingScheduler(
9692
ctx context.Context,
9793
stopper *stop.Stopper,
@@ -100,42 +96,30 @@ func StartHotRangesLoggingScheduler(
10096
ti *tenantcapabilities.Entry,
10197
) error {
10298
multiTenant := ti != nil && ti.TenantID.IsSet() && !ti.TenantID.IsSystem()
103-
scheduler := hotRangesLoggingScheduler{
99+
100+
if multiTenant {
101+
return nil
102+
}
103+
104+
logger := hotRangesLogger{
104105
sServer: sServer,
105106
st: st,
106-
stopper: stopper,
107-
multiTenant: multiTenant,
107+
multiTenant: false,
108108
lastLogged: timeutil.Now(),
109109
}
110110

111-
if multiTenant {
112-
return scheduler.startJob()
113-
}
114-
115-
return scheduler.startTask(ctx, stopper)
111+
return logger.startTask(ctx, stopper)
116112
}
117113

118114
// startTask is for usage in a system-tenant or non-multi-tenant
119115
// installation.
120-
func (s *hotRangesLoggingScheduler) startTask(ctx context.Context, stopper *stop.Stopper) error {
116+
func (s *hotRangesLogger) startTask(ctx context.Context, stopper *stop.Stopper) error {
121117
return stopper.RunAsyncTask(ctx, "hot-ranges-stats", func(ctx context.Context) {
122118
s.start(ctx, stopper)
123119
})
124120
}
125121

126-
func (s *hotRangesLoggingScheduler) startJob() error {
127-
jobs.RegisterConstructor(
128-
jobspb.TypeHotRangesLogger,
129-
func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
130-
s.job = job
131-
return s
132-
},
133-
jobs.DisablesTenantCostControl,
134-
)
135-
return nil
136-
}
137-
138-
func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Stopper) {
122+
func (s *hotRangesLogger) start(ctx context.Context, stopper *stop.Stopper) {
139123
for {
140124
ci := CheckInterval
141125
if s.multiTenant {
@@ -156,7 +140,7 @@ func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Sto
156140

157141
// maybeLogHotRanges is a small helper function which couples the
158142
// functionality of checking whether to log and logging.
159-
func (s *hotRangesLoggingScheduler) maybeLogHotRanges(ctx context.Context, stopper *stop.Stopper) {
143+
func (s *hotRangesLogger) maybeLogHotRanges(ctx context.Context, stopper *stop.Stopper) {
160144
if s.shouldLog(ctx) {
161145
s.logHotRanges(ctx, stopper)
162146
s.lastLogged = timeutil.Now()
@@ -171,7 +155,7 @@ func (s *hotRangesLoggingScheduler) maybeLogHotRanges(ctx context.Context, stopp
171155
// - One of the following conditions is met:
172156
// -- It's been greater than the log interval since we last logged.
173157
// -- One of the replicas see exceeds our cpu threshold.
174-
func (s *hotRangesLoggingScheduler) shouldLog(ctx context.Context) bool {
158+
func (s *hotRangesLogger) shouldLog(ctx context.Context) bool {
175159

176160
enabled := TelemetryHotRangesStatsEnabled.Get(&s.st.SV)
177161
if !enabled {
@@ -210,7 +194,7 @@ func maxCPU(ranges []*serverpb.HotRangesResponseV2_HotRange) time.Duration {
210194
// stats for ranges requested, or everything. It also determines
211195
// whether to limit the request to only the local node, or to
212196
// issue a fanout for multi-tenant apps.
213-
func (s *hotRangesLoggingScheduler) getHotRanges(
197+
func (s *hotRangesLogger) getHotRanges(
214198
ctx context.Context, statsOnly bool,
215199
) (*serverpb.HotRangesResponseV2, error) {
216200
req := &serverpb.HotRangesRequest{
@@ -228,7 +212,7 @@ func (s *hotRangesLoggingScheduler) getHotRanges(
228212

229213
// logHotRanges collects the hot ranges from this node's status server and
230214
// sends them to the HEALTH log channel.
231-
func (s *hotRangesLoggingScheduler) logHotRanges(ctx context.Context, stopper *stop.Stopper) {
215+
func (s *hotRangesLogger) logHotRanges(ctx context.Context, stopper *stop.Stopper) {
232216
resp, err := s.getHotRanges(ctx, false)
233217
if err != nil {
234218
log.Warningf(ctx, "failed to get hot ranges: %s", err)

pkg/server/structlogging/hot_ranges_log_job.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,19 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/jobs"
12+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
13+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
14+
"github.com/cockroachdb/cockroach/pkg/sql"
1215
"github.com/cockroachdb/cockroach/pkg/util/log"
16+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1317
"github.com/cockroachdb/errors"
1418
)
1519

20+
type hotRangesLoggingJob struct {
21+
job *jobs.Job
22+
settings *cluster.Settings
23+
}
24+
1625
// hot_ranges_log_job.go adds the required functions to satisfy
1726
// the jobs.Scheduler interface for the hot ranges logging job.
1827
// This is only required for app tenants in a multi-tenant deployment
@@ -21,16 +30,24 @@ import (
2130
// It's run as a job, as since fanout is required, only one node
2231
// needs to run it at any given time, as opposed to the every
2332
// node task behavior otherwise.
24-
func (s *hotRangesLoggingScheduler) Resume(ctx context.Context, execCtxI interface{}) error {
33+
func (j *hotRangesLoggingJob) Resume(ctx context.Context, execCtxI interface{}) error {
2534
// This job is a forever running background job, and it is always safe to
2635
// terminate the SQL pod whenever the job is running, so mark it as idle.
27-
s.job.MarkIdle(true)
36+
j.job.MarkIdle(true)
2837

29-
s.start(ctx, s.stopper)
38+
jobExec := execCtxI.(sql.JobExecContext)
39+
execCfg := jobExec.ExecCfg()
40+
logger := &hotRangesLogger{
41+
sServer: execCfg.TenantStatusServer,
42+
st: j.settings,
43+
multiTenant: true,
44+
lastLogged: timeutil.Now(),
45+
}
46+
logger.start(ctx, execCfg.Stopper)
3047
return nil
3148
}
3249

33-
func (s *hotRangesLoggingScheduler) OnFailOrCancel(
50+
func (j *hotRangesLoggingJob) OnFailOrCancel(
3451
ctx context.Context, execCtx interface{}, jobErr error,
3552
) error {
3653
if jobs.HasErrJobCanceled(jobErr) {
@@ -42,6 +59,19 @@ func (s *hotRangesLoggingScheduler) OnFailOrCancel(
4259
return nil
4360
}
4461

45-
func (s *hotRangesLoggingScheduler) CollectProfile(ctx context.Context, execCtx interface{}) error {
62+
func (j *hotRangesLoggingJob) CollectProfile(ctx context.Context, execCtx interface{}) error {
4663
return nil
4764
}
65+
66+
func init() {
67+
jobs.RegisterConstructor(
68+
jobspb.TypeHotRangesLogger,
69+
func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
70+
return &hotRangesLoggingJob{
71+
job: job,
72+
settings: settings,
73+
}
74+
},
75+
jobs.DisablesTenantCostControl,
76+
)
77+
}

0 commit comments

Comments
 (0)