Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit 930033c

Browse files
chore: Replace QueuedCount -> CountByState with bitset parameter (#64302)
Previously, the QueuedCount method was confusing because: 1. By default, it actually returned the count for both the 'queued' and 'errored' states (despite the name just have 'Queued'). 2. There was an additional boolean flag for also returning entries in the 'processing' state, but reduced clarity at call-sites. So I've changed the method to take a bitset instead, mirroring the just-added Exists API, and renamed the method to a more generic 'CountByState'. While this does make call-sites a bit more verbose, I think the clarity win makes the change an overall positive one.
1 parent 6ee4446 commit 930033c

File tree

18 files changed

+182
-169
lines changed

18 files changed

+182
-169
lines changed

cmd/worker/internal/executormultiqueue/multiqueue_metrics_reporter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ func (j *multiqueueMetricsReporterJob) Routines(_ context.Context, observationCt
4747
multiqueueMetricsReporter, err := executorqueue.NewMultiqueueMetricReporter(
4848
executortypes.ValidQueueNames,
4949
configInst.MetricsConfig,
50-
codeIntelStore.QueuedCount,
51-
batchesStore.QueuedCount,
50+
codeIntelStore.CountByState,
51+
batchesStore.CountByState,
5252
)
5353
if err != nil {
5454
return nil, err

cmd/worker/internal/executorqueue/external_emitter.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ import (
77
uploadsshared "github.com/sourcegraph/sourcegraph/internal/codeintel/uploads/shared"
88
"github.com/sourcegraph/sourcegraph/internal/goroutine"
99
"github.com/sourcegraph/sourcegraph/internal/workerutil"
10+
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
1011
"github.com/sourcegraph/sourcegraph/lib/errors"
1112
)
1213

1314
type externalEmitter[T workerutil.Record] struct {
1415
queueName string
15-
countFuncs []func(ctx context.Context, includeProcessing bool) (int, error)
16+
countFuncs []func(ctx context.Context, bitset store.RecordState) (int, error)
1617
reporters []reporter
1718
allocation QueueAllocation
1819
}
@@ -27,9 +28,9 @@ type reporter interface {
2728
func (r *externalEmitter[T]) Handle(ctx context.Context) error {
2829
var count int
2930
for _, countFunc := range r.countFuncs {
30-
subCount, err := countFunc(context.Background(), true)
31+
subCount, err := countFunc(context.Background(), store.StateQueued|store.StateErrored|store.StateProcessing)
3132
if err != nil {
32-
return errors.Wrap(err, "dbworkerstore.QueuedCount")
33+
return errors.Wrap(err, "dbworkerstore.CountByState")
3334
}
3435
count += subCount
3536
}

cmd/worker/internal/executorqueue/reporter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func NewMetricReporter[T workerutil.Record](observationCtx *observation.Context,
1919
return initExternalMetricReporters(queueName, store, metricsConfig)
2020
}
2121

22-
func initExternalMetricReporters[T workerutil.Record](queueName string, store store.Store[T], metricsConfig *Config) (goroutine.BackgroundRoutine, error) {
22+
func initExternalMetricReporters[T workerutil.Record](queueName string, store_ store.Store[T], metricsConfig *Config) (goroutine.BackgroundRoutine, error) {
2323
reporters, err := configureReporters(metricsConfig)
2424
if err != nil {
2525
return nil, err
@@ -30,7 +30,7 @@ func initExternalMetricReporters[T workerutil.Record](queueName string, store st
3030
ctx,
3131
&externalEmitter[T]{
3232
queueName: queueName,
33-
countFuncs: []func(ctx context.Context, includeProcessing bool) (int, error){store.QueuedCount},
33+
countFuncs: []func(context.Context, store.RecordState) (int, error){store_.CountByState},
3434
reporters: reporters,
3535
allocation: metricsConfig.Allocations[queueName],
3636
},
@@ -43,7 +43,7 @@ func initExternalMetricReporters[T workerutil.Record](queueName string, store st
4343
// NewMultiqueueMetricReporter returns a periodic background routine that reports the sum of the lengths all configured queues.
4444
// This does not reinitialise Prometheus metrics as is done in NewMetricReporter, as this only needs to be done once and is
4545
// already done for the single queue metrics.
46-
func NewMultiqueueMetricReporter(queueNames []string, metricsConfig *Config, countFuncs ...func(ctx context.Context, includeProcessing bool) (int, error)) (goroutine.BackgroundRoutine, error) {
46+
func NewMultiqueueMetricReporter(queueNames []string, metricsConfig *Config, countFuncs ...func(_ context.Context, bitset store.RecordState) (int, error)) (goroutine.BackgroundRoutine, error) {
4747
reporters, err := configureReporters(metricsConfig)
4848
if err != nil {
4949
return nil, err

cmd/worker/internal/permissions/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ go_test(
7979
"//internal/observation",
8080
"//internal/timeutil",
8181
"//internal/types",
82+
"//internal/workerutil/dbworker/store",
8283
"//lib/pointers",
8384
"//schema",
8485
"@com_github_google_go_cmp//cmp",

cmd/worker/internal/permissions/bitbucket_projects_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/sourcegraph/sourcegraph/internal/extsvc/bitbucketserver"
2323
"github.com/sourcegraph/sourcegraph/internal/observation"
2424
"github.com/sourcegraph/sourcegraph/internal/types"
25+
dbworker "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
2526
"github.com/sourcegraph/sourcegraph/schema"
2627
)
2728

@@ -39,7 +40,7 @@ func TestStore(t *testing.T) {
3940
require.NotZero(t, jobID)
4041

4142
store := createBitbucketProjectPermissionsStore(observation.TestContextTB(t), db, &config{})
42-
count, err := store.QueuedCount(ctx, true)
43+
count, err := store.CountByState(ctx, dbworker.StateQueued|dbworker.StateErrored|dbworker.StateProcessing)
4344
require.NoError(t, err)
4445
require.Equal(t, 1, count)
4546
}

cmd/worker/internal/search/job.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive"
2121
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/service"
2222
"github.com/sourcegraph/sourcegraph/internal/search/exhaustive/store"
23+
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
2324
)
2425

2526
// config stores shared config we can override in each worker. We don't expose
@@ -39,7 +40,7 @@ type searchJob struct {
3940
once sync.Once
4041
err error
4142
workerStores []interface {
42-
QueuedCount(context.Context, bool) (int, error)
43+
CountByState(_ context.Context, bitset dbworkerstore.RecordState) (int, error)
4344
}
4445
workers []goroutine.BackgroundRoutine
4546
}
@@ -136,8 +137,9 @@ func (j *searchJob) newSearchJobRoutines(
136137
// hasWork returns true if any of the workers have work in its queue or is
137138
// processing something. This is only exposed for tests.
138139
func (j *searchJob) hasWork(ctx context.Context) bool {
140+
statesBitset := dbworkerstore.StateQueued | dbworkerstore.StateErrored | dbworkerstore.StateProcessing
139141
for _, w := range j.workerStores {
140-
if count, _ := w.QueuedCount(ctx, true); count > 0 {
142+
if count, _ := w.CountByState(ctx, statesBitset); count > 0 {
141143
return true
142144
}
143145
}

internal/codeintel/syntactic_indexing/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ go_test(
6565
"//internal/gitserver",
6666
"//internal/gitserver/gitdomain",
6767
"//internal/observation",
68+
"//internal/workerutil/dbworker/store",
6869
"@com_github_keegancsmith_sqlf//:sqlf",
6970
"@com_github_stretchr_testify//require",
7071
],

internal/codeintel/syntactic_indexing/scheduler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/sourcegraph/sourcegraph/internal/gitserver"
2424
"github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain"
2525
"github.com/sourcegraph/sourcegraph/internal/observation"
26+
dbworker "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
2627
)
2728

2829
func TestSyntacticIndexingScheduler(t *testing.T) {
@@ -81,7 +82,7 @@ func TestSyntacticIndexingScheduler(t *testing.T) {
8182

8283
err := scheduler.Schedule(observationCtx, ctx, time.Now())
8384
require.NoError(t, err)
84-
require.Equal(t, 2, unwrap(jobStore.DBWorkerStore().QueuedCount(ctx, false))(t))
85+
require.Equal(t, 2, unwrap(jobStore.DBWorkerStore().CountByState(ctx, dbworker.StateQueued|dbworker.StateErrored))(t))
8586

8687
job1, recordReturned, err := jobStore.DBWorkerStore().Dequeue(ctx, "worker-1", []*sqlf.Query{})
8788
require.NoError(t, err)

internal/codeintel/syntactic_indexing/store_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/sourcegraph/sourcegraph/internal/database"
1414
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
1515
"github.com/sourcegraph/sourcegraph/internal/observation"
16+
dbworker "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
1617
)
1718

1819
func TestSyntacticIndexingStoreDequeue(t *testing.T) {
@@ -38,7 +39,7 @@ func TestSyntacticIndexingStoreDequeue(t *testing.T) {
3839

3940
ctx := context.Background()
4041

41-
initCount, _ := store.QueuedCount(ctx, true)
42+
initCount, _ := store.CountByState(ctx, dbworker.StateQueued|dbworker.StateErrored|dbworker.StateProcessing)
4243

4344
require.Equal(t, 0, initCount)
4445

@@ -83,7 +84,7 @@ func TestSyntacticIndexingStoreDequeue(t *testing.T) {
8384
},
8485
)
8586

86-
afterCount, _ := store.QueuedCount(ctx, true)
87+
afterCount, _ := store.CountByState(ctx, dbworker.StateQueued|dbworker.StateErrored|dbworker.StateProcessing)
8788

8889
require.Equal(t, 3, afterCount)
8990

@@ -166,7 +167,7 @@ func TestSyntacticIndexingStoreEnqueue(t *testing.T) {
166167

167168
// Assertions below verify that records inserted by InsertJobs are
168169
// still visible by DB Worker interface
169-
afterCount, _ := store.QueuedCount(ctx, true)
170+
afterCount, _ := store.CountByState(ctx, dbworker.StateQueued|dbworker.StateErrored|dbworker.StateProcessing)
170171

171172
require.Equal(t, 2, afterCount)
172173

internal/insights/background/queryrunner/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func NewWorker(ctx context.Context, logger log.Logger, workerStore *workerStoreE
6161
Name: "src_query_runner_worker_total",
6262
Help: "Total number of jobs in the queued state.",
6363
}, func() float64 {
64-
count, err := workerStore.QueuedCount(context.Background(), false)
64+
count, err := workerStore.CountByState(context.Background(), dbworkerstore.StateQueued|dbworkerstore.StateErrored)
6565
if err != nil {
6666
logger.Error("Failed to get queued job count", log.Error(err))
6767
}

0 commit comments

Comments
 (0)