Skip to content

Commit 70cd757

Browse files
test: refactor GC and GC tests to be more reliable using synctest (#2645)
Co-authored-by: James Thomas <james9074@gmail.com>
1 parent 20b7da4 commit 70cd757

File tree

9 files changed

+104
-79
lines changed

9 files changed

+104
-79
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ require (
4040
github.com/bits-and-blooms/bloom/v3 v3.7.1
4141
github.com/caio/go-tdigest/v4 v4.1.0
4242
github.com/ccoveille/go-safecast/v2 v2.0.0
43-
github.com/cenkalti/backoff/v4 v4.3.0
43+
github.com/cenkalti/backoff/v5 v5.0.3
4444
github.com/cespare/xxhash/v2 v2.3.0
4545
github.com/cloudspannerecosystem/spanner-change-streams-tail v0.3.1
4646
github.com/creasty/defaults v1.8.0
@@ -213,6 +213,7 @@ require (
213213
github.com/butuzov/mirror v1.3.0 // indirect
214214
github.com/catenacyber/perfsprint v0.10.0 // indirect
215215
github.com/ccojocar/zxcvbn-go v1.0.4 // indirect
216+
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
216217
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect
217218
github.com/charithe/durationcheck v0.0.11 // indirect
218219
github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,8 @@ github.com/ccoveille/go-safecast/v2 v2.0.0 h1:+5eyITXAUj3wMjad6cRVJKGnC7vDS55zk0
803803
github.com/ccoveille/go-safecast/v2 v2.0.0/go.mod h1:JIYA4CAR33blIDuE6fSwCp2sz1oOBahXnvmdBhOAABs=
804804
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
805805
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
806+
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
807+
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
806808
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
807809
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
808810
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=

internal/datastore/common/gc.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/cenkalti/backoff/v4"
8+
"github.com/cenkalti/backoff/v5"
99
"github.com/prometheus/client_golang/prometheus"
1010
"github.com/rs/zerolog"
1111

@@ -149,14 +149,20 @@ var MaxGCInterval = 60 * time.Minute
149149
// StartGarbageCollector loops forever until the context is canceled and
150150
// performs garbage collection on the provided interval.
151151
func StartGarbageCollector(ctx context.Context, collectable GarbageCollectableDatastore, interval, window, timeout time.Duration) error {
152-
return startGarbageCollectorWithMaxElapsedTime(ctx, collectable, interval, window, 0, timeout, gcFailureCounter)
152+
return runOnIntervalWithBackoff(ctx, func() error {
153+
// NOTE: we're okay using the parent context here because the
154+
// callers of this function create a dedicated garbage collection
155+
// context anyway, which is only cancelled when the ds is closed.
156+
gcCtx, cancel := context.WithTimeout(ctx, timeout)
157+
defer cancel()
158+
return RunGarbageCollection(gcCtx, collectable, window)
159+
}, interval, timeout, gcFailureCounter)
153160
}
154161

155-
func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, collectable GarbageCollectableDatastore, interval, window, maxElapsedTime, timeout time.Duration, failureCounter prometheus.Counter) error {
162+
func runOnIntervalWithBackoff(ctx context.Context, taskFn func() error, interval, timeout time.Duration, failureCounter prometheus.Counter) error {
156163
backoffInterval := backoff.NewExponentialBackOff()
157164
backoffInterval.InitialInterval = interval
158165
backoffInterval.MaxInterval = max(MaxGCInterval, interval)
159-
backoffInterval.MaxElapsedTime = maxElapsedTime
160166
backoffInterval.Reset()
161167

162168
nextInterval := interval
@@ -175,11 +181,10 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, collectable Ga
175181
case <-time.After(nextInterval):
176182
log.Ctx(ctx).Info().
177183
Dur("interval", nextInterval).
178-
Dur("window", window).
179184
Dur("timeout", timeout).
180185
Msg("running garbage collection worker")
181186

182-
err := RunGarbageCollection(collectable, window, timeout)
187+
err := taskFn()
183188
if err != nil {
184189
failureCounter.Inc()
185190
nextInterval = backoffInterval.NextBackOff()
@@ -200,10 +205,7 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, collectable Ga
200205
}
201206

202207
// RunGarbageCollection runs garbage collection for the datastore.
203-
func RunGarbageCollection(collectable GarbageCollectableDatastore, window, timeout time.Duration) error {
204-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
205-
defer cancel()
206-
208+
func RunGarbageCollection(ctx context.Context, collectable GarbageCollectableDatastore, window time.Duration) error {
207209
ctx, span := tracer.Start(ctx, "RunGarbageCollection")
208210
defer span.End()
209211

internal/datastore/common/gc_test.go

Lines changed: 76 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@ package common
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"slices"
78
"sync"
89
"testing"
10+
"testing/synctest"
911
"time"
1012

1113
"github.com/prometheus/client_golang/prometheus"
1214
promclient "github.com/prometheus/client_model/go"
1315
"github.com/stretchr/testify/require"
16+
"go.uber.org/goleak"
1417

1518
"github.com/authzed/spicedb/internal/datastore/revisions"
1619
"github.com/authzed/spicedb/pkg/datastore"
@@ -181,50 +184,50 @@ func (d revisionErrorDeleter) DeleteExpiredRels() (int64, error) {
181184
return 0, nil
182185
}
183186

187+
func alwaysErr() error {
188+
return errors.New("aaagh")
189+
}
190+
184191
func TestGCFailureBackoff(t *testing.T) {
192+
t.Cleanup(func() {
193+
goleak.VerifyNone(t)
194+
})
185195
localCounter := prometheus.NewCounter(gcFailureCounterConfig)
186196
reg := prometheus.NewRegistry()
187197
require.NoError(t, reg.Register(localCounter))
188198

189-
ctx, cancel := context.WithCancel(t.Context())
190-
defer cancel()
191-
go func() {
192-
gc := newFakeGCStore(alwaysErrorDeleter{})
193-
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, gc, 100*time.Millisecond, 1*time.Second, 1*time.Nanosecond, 1*time.Minute, localCounter))
194-
}()
195-
time.Sleep(200 * time.Millisecond)
196-
cancel()
199+
errCh := make(chan error, 1)
200+
synctest.Test(t, func(t *testing.T) {
201+
duration := 1000 * time.Second
202+
ctx, cancel := context.WithTimeout(t.Context(), duration)
203+
t.Cleanup(func() {
204+
cancel()
205+
})
206+
go func() {
207+
errCh <- runOnIntervalWithBackoff(ctx, alwaysErr, 100*time.Second, 1*time.Minute, localCounter)
208+
}()
209+
time.Sleep(duration)
210+
synctest.Wait()
211+
})
212+
require.Error(t, <-errCh)
197213

198214
metrics, err := reg.Gather()
199215
require.NoError(t, err)
200216
var mf *promclient.MetricFamily
201217
for _, metric := range metrics {
202218
if metric.GetName() == "spicedb_datastore_gc_failure_total" {
203219
mf = metric
220+
break
204221
}
205222
}
206-
require.Greater(t, *(mf.GetMetric()[0].Counter.Value), 100.0, "MaxElapsedTime=1ns did not cause backoff to get ignored")
207-
208-
localCounter = prometheus.NewCounter(gcFailureCounterConfig)
209-
reg = prometheus.NewRegistry()
210-
require.NoError(t, reg.Register(localCounter))
211-
ctx, cancel = context.WithCancel(t.Context())
212-
defer cancel()
213-
go func() {
214-
gc := newFakeGCStore(alwaysErrorDeleter{})
215-
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, gc, 100*time.Millisecond, 0, 1*time.Second, 1*time.Minute, localCounter))
216-
}()
217-
time.Sleep(200 * time.Millisecond)
218-
cancel()
219-
220-
metrics, err = reg.Gather()
221-
require.NoError(t, err)
222-
for _, metric := range metrics {
223-
if metric.GetName() == "spicedb_datastore_gc_failure_total" {
224-
mf = metric
225-
}
226-
}
227-
require.Less(t, *(mf.GetMetric()[0].Counter.Value), 3.0, "MaxElapsedTime=0 should have not caused backoff to get ignored")
223+
// We expect about 5 failures; the behavior of the library means that there's some wiggle room here.
224+
// (owing to the jitter in the backoff)
225+
// we should see failures at 100s, 200s, 400s, 800s
226+
// but depending on jitter this could end up being squished down such that we get 5 failures.
227+
// Experimentally, we see 5 most often and 4 sometimes, so asserting greater than 3 works here.
228+
fmt.Println("failures")
229+
fmt.Println(*(mf.GetMetric()[0].Counter.Value))
230+
require.Greater(t, *(mf.GetMetric()[0].Counter.Value), 3.0, "did not see expected number of backoffs")
228231
}
229232

230233
// Ensure the garbage collector interval is reset after recovering from an
@@ -238,23 +241,29 @@ func TestGCFailureBackoffReset(t *testing.T) {
238241
errorOnRevisions: []uint64{1, 2, 3, 4, 5},
239242
})
240243

241-
ctx, cancel := context.WithCancel(t.Context())
242-
defer cancel()
243-
244-
go func() {
245-
interval := 10 * time.Millisecond
246-
window := 10 * time.Second
247-
timeout := 1 * time.Minute
248-
249-
require.Error(t, StartGarbageCollector(ctx, gc, interval, window, timeout))
250-
}()
244+
errCh := make(chan error, 1)
245+
synctest.Test(t, func(t *testing.T) {
246+
ctx, cancel := context.WithCancel(t.Context())
247+
go func() {
248+
interval := 10 * time.Millisecond
249+
window := 10 * time.Second
250+
timeout := 1 * time.Minute
251+
252+
errCh <- StartGarbageCollector(ctx, gc, interval, window, timeout)
253+
}()
254+
// The garbage collector should fail 5 times starting with 10ms interval,
255+
// which should take ~160ms to complete. after that, it should resume
256+
// completing a revision every 10ms, which should get us at least 20
257+
// successful iterations with a 500ms total execution time.
258+
// If the interval is not reset, it should blow past the 500ms timer
259+
// after only completing ~3 iterations.
260+
time.Sleep(500 * time.Millisecond)
261+
cancel()
262+
synctest.Wait()
263+
})
251264

252-
time.Sleep(500 * time.Millisecond)
253-
cancel()
265+
require.Error(t, <-errCh)
254266

255-
// The next interval should have been reset after recovering from the error.
256-
// If it is not reset, the last exponential backoff interval will not give
257-
// the GC enough time to run.
258267
gc.lock.Lock()
259268
defer gc.lock.Unlock()
260269

@@ -264,20 +273,29 @@ func TestGCFailureBackoffReset(t *testing.T) {
264273
func TestGCUnlockOnTimeout(t *testing.T) {
265274
gc := newFakeGCStore(alwaysErrorDeleter{})
266275

267-
ctx, cancel := context.WithCancel(t.Context())
268-
defer cancel()
269-
270-
go func() {
271-
interval := 10 * time.Millisecond
272-
window := 10 * time.Second
273-
timeout := 1 * time.Millisecond
274-
275-
require.Error(t, StartGarbageCollector(ctx, gc, interval, window, timeout))
276-
}()
277-
278-
time.Sleep(30 * time.Millisecond)
279-
require.False(t, gc.HasGCRun(), "GC should not have run")
276+
errCh := make(chan error, 1)
277+
hasRunChan := make(chan bool, 1)
278+
synctest.Test(t, func(t *testing.T) {
279+
ctx, cancel := context.WithCancel(t.Context())
280+
t.Cleanup(func() {
281+
cancel()
282+
})
283+
go func() {
284+
interval := 10 * time.Millisecond
285+
window := 10 * time.Second
286+
timeout := 1 * time.Minute
287+
288+
errCh <- StartGarbageCollector(ctx, gc, interval, window, timeout)
289+
}()
290+
time.Sleep(30 * time.Millisecond)
291+
hasRunChan <- gc.HasGCRun()
292+
cancel()
293+
synctest.Wait()
294+
})
295+
require.Error(t, <-errCh)
296+
require.False(t, <-hasRunChan, "GC should not have run because it should always be erroring.")
280297

298+
// TODO: should this be inside the goroutine as well?
281299
gc.fakeGC.lock.Lock()
282300
defer gc.fakeGC.lock.Unlock()
283301

internal/services/health/health.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"time"
66

7-
"github.com/cenkalti/backoff/v4"
7+
"github.com/cenkalti/backoff/v5"
88
healthpb "google.golang.org/grpc/health/grpc_health_v1"
99

1010
"github.com/authzed/grpcutil"
@@ -64,7 +64,6 @@ func (hm *healthManager) Checker(ctx context.Context) func() error {
6464
return func() error {
6565
// Run immediately for the initial check
6666
backoffInterval := backoff.NewExponentialBackOff()
67-
backoffInterval.MaxElapsedTime = 0
6867

6968
ticker := time.After(0)
7069

internal/telemetry/reporter.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"time"
1515

1616
prompb "buf.build/gen/go/prometheus/prometheus/protocolbuffers/go"
17-
"github.com/cenkalti/backoff/v4"
17+
"github.com/cenkalti/backoff/v5"
1818
"github.com/golang/snappy"
1919
"github.com/prometheus/client_golang/prometheus"
2020
"github.com/prometheus/common/expfmt"
@@ -176,7 +176,6 @@ func RemoteReporter(
176176
backoffInterval := backoff.NewExponentialBackOff()
177177
backoffInterval.InitialInterval = interval
178178
backoffInterval.MaxInterval = MaxElapsedTimeBetweenReports
179-
backoffInterval.MaxElapsedTime = 0
180179

181180
// Must reset the backoff object after changing parameters
182181
backoffInterval.Reset()

pkg/cmd/datastore.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,14 @@ func NewGCDatastoreCommand(programName string, cfg *datastore.Config) *cobra.Com
6060
Long: "Executes garbage collection against the datastore. Deletes stale relationships, expired relationships, and stale transactions.",
6161
PreRunE: server.DefaultPreRunE(programName),
6262
RunE: termination.PublishError(func(cmd *cobra.Command, args []string) error {
63-
return executeGC(cfg)
63+
return executeGC(cmd.Context(), cfg)
6464
}),
6565
}
6666
}
6767

68-
func executeGC(cfg *datastore.Config) error {
69-
ctx := context.Background()
68+
func executeGC(ctx context.Context, cfg *datastore.Config) error {
69+
ctx, cancel := context.WithTimeout(ctx, cfg.GCMaxOperationTime)
70+
defer cancel()
7071

7172
// Disable background GC and hedging.
7273
cfg.GCInterval = -1 * time.Hour
@@ -88,7 +89,7 @@ func executeGC(cfg *datastore.Config) error {
8889
Float64("gc_max_operation_time_seconds", cfg.GCMaxOperationTime.Seconds()).
8990
Msg("Running garbage collection...")
9091

91-
err = common.RunGarbageCollection(gcds, cfg.GCWindow, cfg.GCMaxOperationTime)
92+
err = common.RunGarbageCollection(ctx, gcds, cfg.GCWindow)
9293
if err != nil {
9394
return err
9495
}

pkg/cmd/datastore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestExecuteGC(t *testing.T) {
3636
t.Parallel()
3737

3838
cfg := tt.cfgBuilder(t)
39-
err := executeGC(cfg)
39+
err := executeGC(t.Context(), cfg)
4040
require.ErrorContains(t, err, tt.expectedError)
4141
})
4242
}

pkg/datastore/test/revisions.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
// RevisionQuantizationTest tests whether or not the requirements for revisions hold
2222
// for a particular datastore.
23+
// TODO: rewrite using synctest
2324
func RevisionQuantizationTest(t *testing.T, tester DatastoreTester) {
2425
testCases := []struct {
2526
quantizationRange time.Duration
@@ -95,6 +96,7 @@ func RevisionSerializationTest(t *testing.T, tester DatastoreTester) {
9596

9697
// GCProcessRunTest tests whether the custom GC process runs for the datastore.
9798
// For datastores that do not have custom GC processes, will no-op.
99+
// TODO: rewrite using synctest
98100
func GCProcessRunTest(t *testing.T, tester DatastoreTester) {
99101
require := require.New(t)
100102
gcWindow := 300 * time.Millisecond
@@ -139,6 +141,7 @@ func GCProcessRunTest(t *testing.T, tester DatastoreTester) {
139141

140142
// RevisionGCTest makes sure revision GC takes place, revisions out-side of the GC window
141143
// are invalid, and revisions inside the GC window are valid.
144+
// TODO: rewrite using synctest if possible
142145
func RevisionGCTest(t *testing.T, tester DatastoreTester) {
143146
require := require.New(t)
144147
gcWindow := 300 * time.Millisecond
@@ -179,7 +182,7 @@ func RevisionGCTest(t *testing.T, tester DatastoreTester) {
179182
if ok {
180183
// Run garbage collection.
181184
gcable.ResetGCCompleted()
182-
err := common.RunGarbageCollection(gcable, gcWindow, 10*time.Second)
185+
err := common.RunGarbageCollection(ctx, gcable, gcWindow)
183186
require.NoError(err)
184187
require.True(gcable.HasGCRun(), "GC was never run as expected")
185188
}

0 commit comments

Comments
 (0)