Skip to content

Commit c811240

Browse files
committed
takepacer: move task pacer to its own util package
Before this commit, the taskPacer lived inside kvserver, which made it unusable from packages that cannot include the kvserver package. This opens the door to using the taskPacer in more packages. Epic: None Release note: None
1 parent c4b6ef8 commit c811240

File tree

9 files changed

+82
-52
lines changed

9 files changed

+82
-52
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,7 @@ ALL_TESTS = [
817817
"//pkg/util/syncutil/singleflight:singleflight_test",
818818
"//pkg/util/syncutil:syncutil_test",
819819
"//pkg/util/sysutil:sysutil_test",
820+
"//pkg/util/taskpacer:taskpacer_test",
820821
"//pkg/util/timeofday:timeofday_test",
821822
"//pkg/util/timetz:timetz_test",
822823
"//pkg/util/timeutil/pgdate:pgdate_test",
@@ -2794,6 +2795,8 @@ GO_TARGETS = [
27942795
"//pkg/util/system:system",
27952796
"//pkg/util/sysutil:sysutil",
27962797
"//pkg/util/sysutil:sysutil_test",
2798+
"//pkg/util/taskpacer:taskpacer",
2799+
"//pkg/util/taskpacer:taskpacer_test",
27972800
"//pkg/util/timeofday:timeofday",
27982801
"//pkg/util/timeofday:timeofday_test",
27992802
"//pkg/util/timetz:timetz",

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ go_library(
107107
"stores_base.go",
108108
"stores_server.go",
109109
"stores_store_liveness.go",
110-
"task_pacer.go",
111110
"testing_knobs.go",
112111
"ts_maintenance_queue.go",
113112
":gen-refreshraftreason-stringer", # keep
@@ -247,6 +246,7 @@ go_library(
247246
"//pkg/util/stop",
248247
"//pkg/util/syncutil",
249248
"//pkg/util/syncutil/singleflight",
249+
"//pkg/util/taskpacer",
250250
"//pkg/util/timeutil",
251251
"//pkg/util/tracing",
252252
"//pkg/util/tracing/tracingpb",
@@ -388,7 +388,6 @@ go_test(
388388
"store_replica_btree_test.go",
389389
"store_test.go",
390390
"stores_test.go",
391-
"task_pacer_test.go",
392391
"testutils_test.go",
393392
"ts_maintenance_queue_test.go",
394393
"txn_recovery_integration_test.go",

pkg/kv/kvserver/store.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ import (
9999
"github.com/cockroachdb/cockroach/pkg/util/stop"
100100
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
101101
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
102+
"github.com/cockroachdb/cockroach/pkg/util/taskpacer"
102103
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
103104
"github.com/cockroachdb/cockroach/pkg/util/tracing"
104105
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
@@ -2577,7 +2578,7 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) {
25772578
// the rate of processing in accordance with the time remaining until the
25782579
// refresh interval ends.
25792580
conf := newRangeFeedUpdaterConf(s.cfg.Settings)
2580-
pacer := NewTaskPacer(conf)
2581+
pacer := taskpacer.New(conf)
25812582
for {
25822583
// Configuration may have changed between runs, load it unconditionally.
25832584
// This will block until an "active" configuration exists, i.e. a one with

pkg/kv/kvserver/store_raft.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/util/mon"
2626
"github.com/cockroachdb/cockroach/pkg/util/stop"
2727
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
28+
"github.com/cockroachdb/cockroach/pkg/util/taskpacer"
2829
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2930
"github.com/cockroachdb/cockroach/pkg/util/tracing"
3031
"github.com/cockroachdb/errors"
@@ -207,7 +208,7 @@ func (qs *raftReceiveQueues) SetEnforceMaxLen(enforceMaxLen bool) {
207208
}
208209

209210
// raftTickPacerConf is a configuration struct for the raft tick pacer.
210-
// It implements the taskPacerConfig interface.
211+
// It implements the taskpacer.Config interface.
211212
type raftTickPacerConf struct {
212213
store *Store
213214
}
@@ -216,11 +217,11 @@ func newRaftTickPacerConf(s *Store) raftTickPacerConf {
216217
return raftTickPacerConf{store: s}
217218
}
218219

219-
func (r raftTickPacerConf) getRefresh() time.Duration {
220+
func (r raftTickPacerConf) GetRefresh() time.Duration {
220221
return r.store.cfg.RaftTickInterval
221222
}
222223

223-
func (r raftTickPacerConf) getSmear() time.Duration {
224+
func (r raftTickPacerConf) GetSmear() time.Duration {
224225
return r.store.cfg.RaftTickSmearInterval
225226
}
226227

@@ -931,7 +932,7 @@ func (s *Store) raftTickLoop(ctx context.Context) {
931932
// Create a config that will be used by the taskPacer, which allows us to pace
932933
// the enqueuing of Raft ticks.
933934
conf := newRaftTickPacerConf(s)
934-
pacer := NewTaskPacer(conf)
935+
pacer := taskpacer.New(conf)
935936

936937
for {
937938
select {

pkg/kv/kvserver/store_rangefeed.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515

1616
// rangeFeedUpdaterConf provides configuration for the rangefeed updater job,
1717
// and allows watching for when it is updated.
18-
// rangeFeedUpdaterConf Implements the taskPacerConfig interface.
18+
// rangeFeedUpdaterConf Implements the taskpacer.Config interface.
1919
type rangeFeedUpdaterConf struct {
2020
settings *cluster.Settings
2121
changed <-chan struct{}
@@ -41,8 +41,8 @@ func newRangeFeedUpdaterConf(st *cluster.Settings) rangeFeedUpdaterConf {
4141
// configuration, and returns it.
4242
func (r rangeFeedUpdaterConf) wait(ctx context.Context) error {
4343
for {
44-
refresh := r.getRefresh()
45-
smear := r.getSmear()
44+
refresh := r.GetRefresh()
45+
smear := r.GetSmear()
4646
if refresh != 0 && smear != 0 {
4747
return nil
4848
}
@@ -55,18 +55,18 @@ func (r rangeFeedUpdaterConf) wait(ctx context.Context) error {
5555
}
5656
}
5757

58-
// getRefresh returns the refresh interval for the rangefeed updater.
59-
func (r rangeFeedUpdaterConf) getRefresh() time.Duration {
58+
// GetRefresh returns the refresh interval for the rangefeed updater.
59+
func (r rangeFeedUpdaterConf) GetRefresh() time.Duration {
6060
refresh := RangeFeedRefreshInterval.Get(&r.settings.SV)
6161
if refresh <= 0 {
6262
refresh = closedts.SideTransportCloseInterval.Get(&r.settings.SV)
6363
}
6464
return refresh
6565
}
6666

67-
// getSmear returns the smear interval for the rangefeed updater.
68-
func (r rangeFeedUpdaterConf) getSmear() time.Duration {
69-
refresh := r.getRefresh()
67+
// GetSmear returns the smear interval for the rangefeed updater.
68+
func (r rangeFeedUpdaterConf) GetSmear() time.Duration {
69+
refresh := r.GetRefresh()
7070
smear := RangeFeedSmearInterval.Get(&r.settings.SV)
7171
if smear <= 0 || smear > refresh {
7272
smear = refresh

pkg/kv/kvserver/store_rangefeed_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestRangeFeedUpdaterConf(t *testing.T) {
9595
if len(tc.updates) != 0 {
9696
<-conf.changed // we must observe an update, otherwise the test times out
9797
}
98-
refresh, smear := conf.getRefresh(), conf.getSmear()
98+
refresh, smear := conf.GetRefresh(), conf.GetSmear()
9999
assert.Equal(t, tc.want, [...]time.Duration{refresh, smear})
100100

101101
ctx, cancel := context.WithTimeout(ctx, time.Millisecond)
@@ -105,7 +105,7 @@ func TestRangeFeedUpdaterConf(t *testing.T) {
105105
if tc.waitErr != nil {
106106
return
107107
}
108-
refresh, smear = conf.getRefresh(), conf.getSmear()
108+
refresh, smear = conf.GetRefresh(), conf.GetSmear()
109109
assert.Equal(t, tc.want, [...]time.Duration{refresh, smear})
110110
})
111111
}

pkg/util/taskpacer/BUILD.bazel

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "taskpacer",
5+
srcs = ["pacer.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/util/taskpacer",
7+
visibility = ["//visibility:public"],
8+
)
9+
10+
go_test(
11+
name = "taskpacer_test",
12+
size = "small",
13+
srcs = ["pacer_test.go"],
14+
embed = [":taskpacer"],
15+
deps = [
16+
"//pkg/util/leaktest",
17+
"//pkg/util/log",
18+
"//pkg/util/timeutil",
19+
"@com_github_stretchr_testify//assert",
20+
"@com_github_stretchr_testify//require",
21+
],
22+
)

pkg/kv/kvserver/task_pacer.go renamed to pkg/util/taskpacer/pacer.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,44 +3,49 @@
33
// Use of this software is governed by the CockroachDB Software License
44
// included in the /LICENSE file.
55

6-
package kvserver
6+
package taskpacer
77

88
import "time"
99

10-
// taskPacerConfig is the interface for the configuration of a taskPacer.
11-
type taskPacerConfig interface {
12-
// getRefresh returns the interval between tasks.
13-
getRefresh() time.Duration
10+
// Config is the interface for the configuration of a Pacer.
11+
type Config interface {
12+
// GetRefresh returns the interval between tasks.
13+
GetRefresh() time.Duration
1414

15-
// getSmear returns the interval between batches inside a task.
16-
getSmear() time.Duration
15+
// GetSmear returns the interval between batches inside a task.
16+
GetSmear() time.Duration
1717
}
1818

19-
// taskPacer controls the pacing of tasks to prevent overloading the system.
20-
type taskPacer struct {
19+
// Pacer controls the pacing of tasks to prevent overloading the system.
20+
// It's typically used to avoid having a sudden spike in runnable goroutines.
21+
// By spreading out work over time, we can keep the number of runnable
22+
// goroutines smaller, which reduces the overhead on the Go scheduler.
23+
type Pacer struct {
2124
// taskStartTime is the time at which the task started.
2225
taskStartTime time.Time
23-
// conf is the configuration for the taskPacer.
24-
conf taskPacerConfig
26+
// conf is the configuration for the Pacer.
27+
conf Config
2528
}
2629

27-
func NewTaskPacer(conf taskPacerConfig) *taskPacer {
28-
return &taskPacer{
30+
// New creates a new task pacer with the given configuration.
31+
func New(conf Config) *Pacer {
32+
return &Pacer{
2933
conf: conf,
3034
}
3135
}
3236

33-
func (tp *taskPacer) StartTask(now time.Time) {
37+
// StartTask marks the start of a new task at the given time.
38+
func (tp *Pacer) StartTask(now time.Time) {
3439
tp.taskStartTime = now
3540
}
3641

3742
// Pace returns the amount of work that should be done and the time by which it
3843
// should be done.
39-
// See the test TestTaskPacer for examples of how to use this method.
40-
func (tp *taskPacer) Pace(now time.Time, workLeft int) (todo int, by time.Time) {
44+
// See the test TestPacer for examples of how to use this method.
45+
func (tp *Pacer) Pace(now time.Time, workLeft int) (todo int, by time.Time) {
4146
deadline := tp.GetDeadline()
4247
timeLeft := deadline.Sub(now)
43-
quantum := tp.conf.getSmear()
48+
quantum := tp.conf.GetSmear()
4449
if workLeft <= 0 || timeLeft <= 0 { // ran out of work or time
4550
return workLeft, now
4651
} else if quantum <= 0 { // smearing is disabled
@@ -62,6 +67,6 @@ func (tp *taskPacer) Pace(now time.Time, workLeft int) (todo int, by time.Time)
6267

6368
// GetDeadline returns the time at which the current batch of work should be
6469
// done.
65-
func (tp *taskPacer) GetDeadline() time.Time {
66-
return tp.taskStartTime.Add(tp.conf.getRefresh())
70+
func (tp *Pacer) GetDeadline() time.Time {
71+
return tp.taskStartTime.Add(tp.conf.GetRefresh())
6772
}

pkg/kv/kvserver/task_pacer_test.go renamed to pkg/util/taskpacer/pacer_test.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// Use of this software is governed by the CockroachDB Software License
44
// included in the /LICENSE file.
55

6-
package kvserver
6+
package taskpacer
77

88
import (
99
"testing"
@@ -16,30 +16,30 @@ import (
1616
"github.com/stretchr/testify/require"
1717
)
1818

19-
// MockTaskPacerConfig is a test implementation of a task pacer configuration.
20-
type MockTaskPacerConfig struct {
19+
// MockConfig is a test implementation of a task pacer configuration.
20+
type MockConfig struct {
2121
// refresh is the interval between task batches
2222
refresh time.Duration
2323
// smear is the interval between task batches
2424
smear time.Duration
2525
}
2626

27-
func newMockTaskPacerConfig(refresh, smear time.Duration) *MockTaskPacerConfig {
28-
return &MockTaskPacerConfig{
27+
func newMockConfig(refresh, smear time.Duration) *MockConfig {
28+
return &MockConfig{
2929
refresh: refresh,
3030
smear: smear,
3131
}
3232
}
3333

34-
func (tp *MockTaskPacerConfig) getRefresh() time.Duration {
34+
func (tp *MockConfig) GetRefresh() time.Duration {
3535
return tp.refresh
3636
}
3737

38-
func (tp *MockTaskPacerConfig) getSmear() time.Duration {
38+
func (tp *MockConfig) GetSmear() time.Duration {
3939
return tp.smear
4040
}
4141

42-
func TestTaskPacer(t *testing.T) {
42+
func TestPacer(t *testing.T) {
4343
defer leaktest.AfterTest(t)()
4444
defer log.Scope(t).Close(t)
4545

@@ -62,7 +62,7 @@ func TestTaskPacer(t *testing.T) {
6262
wantBy: []time.Duration{200},
6363
wantDone: 55,
6464
}, {
65-
// If smear is set to 0, the taskPacer should not smear the work over
65+
// If smear is set to 0, the Pacer should not smear the work over
6666
// time.
6767
desc: "zero-smear",
6868
deadline: 200, smear: 0, work: 1234,
@@ -143,8 +143,8 @@ func TestTaskPacer(t *testing.T) {
143143
start := timeutil.Unix(946684800, 0) // Jan 1, 2000
144144
now := start
145145

146-
conf := newMockTaskPacerConfig(tc.deadline, tc.smear)
147-
pacer := NewTaskPacer(conf)
146+
conf := newMockConfig(tc.deadline, tc.smear)
147+
pacer := New(conf)
148148
pacer.StartTask(now)
149149

150150
for work, startAt := tc.work, now; work != 0; {
@@ -171,9 +171,9 @@ func TestTaskPacer(t *testing.T) {
171171
}
172172
}
173173

174-
// TestTaskPacerAccommodatesConfChanges tests that the taskPacer can accommodate
174+
// TestPacerAccommodatesConfChanges tests that the Pacer can accommodate
175175
// changing the refresh and the smear intervals mid-run.
176-
func TestTaskPacerAccommodatesConfChanges(t *testing.T) {
176+
func TestPacerAccommodatesConfChanges(t *testing.T) {
177177
defer leaktest.AfterTest(t)()
178178
defer log.Scope(t).Close(t)
179179

@@ -242,8 +242,8 @@ func TestTaskPacerAccommodatesConfChanges(t *testing.T) {
242242
start := timeutil.Unix(946684800, 0) // Jan 1, 2000
243243
now := start
244244

245-
conf := newMockTaskPacerConfig(tc.refreshes[0], tc.smears[0])
246-
pacer := NewTaskPacer(conf)
245+
conf := newMockConfig(tc.refreshes[0], tc.smears[0])
246+
pacer := New(conf)
247247
pacer.StartTask(now)
248248

249249
index := 0
@@ -274,5 +274,4 @@ func TestTaskPacerAccommodatesConfChanges(t *testing.T) {
274274
assert.Equal(t, tc.wantDone, now.Sub(start))
275275
})
276276
}
277-
278277
}

0 commit comments

Comments
 (0)