Skip to content

Commit e20df7c

Browse files
authored
fix: goroutine leaks (authzed#2757)
1 parent fd60318 commit e20df7c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+285
-179
lines changed

internal/caveats/run_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ func TestRunCaveatExpressions(t *testing.T) {
451451
t.Run(tc.name, func(t *testing.T) {
452452
req := require.New(t)
453453

454-
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
454+
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 0, memdb.DisableGC)
455455
req.NoError(err)
456456

457457
ds, _ := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `
@@ -510,7 +510,7 @@ func TestRunCaveatExpressions(t *testing.T) {
510510
func TestRunCaveatWithMissingMap(t *testing.T) {
511511
req := require.New(t)
512512

513-
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
513+
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 0, memdb.DisableGC)
514514
req.NoError(err)
515515

516516
ds, _ := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `
@@ -540,7 +540,7 @@ func TestRunCaveatWithMissingMap(t *testing.T) {
540540
func TestRunCaveatWithEmptyMap(t *testing.T) {
541541
req := require.New(t)
542542

543-
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
543+
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 0, memdb.DisableGC)
544544
req.NoError(err)
545545

546546
ds, _ := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `
@@ -637,7 +637,7 @@ func (f noCaveatsReader) LegacyListAllCaveats(ctx context.Context) ([]datastore.
637637
func TestRunCaveatWithMissingDefinition(t *testing.T) {
638638
req := require.New(t)
639639

640-
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
640+
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 0, memdb.DisableGC)
641641
req.NoError(err)
642642

643643
ds, _ := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `
@@ -667,7 +667,7 @@ func TestRunCaveatWithMissingDefinition(t *testing.T) {
667667
func TestCaveatRunnerPopulateCaveatDefinitionsForExpr(t *testing.T) {
668668
req := require.New(t)
669669

670-
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
670+
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 0, memdb.DisableGC)
671671
req.NoError(err)
672672

673673
ds, _ := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `
@@ -712,7 +712,7 @@ func TestCaveatRunnerPopulateCaveatDefinitionsForExpr(t *testing.T) {
712712
func TestCaveatRunnerEmptyExpression(t *testing.T) {
713713
req := require.New(t)
714714

715-
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
715+
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 0, memdb.DisableGC)
716716
req.NoError(err)
717717

718718
ds, _ := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `
@@ -790,7 +790,7 @@ func TestSyntheticResultMissingVarNames(t *testing.T) {
790790
func TestUnknownCaveatOperation(t *testing.T) {
791791
req := require.New(t)
792792

793-
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
793+
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 0, memdb.DisableGC)
794794
req.NoError(err)
795795

796796
ds, _ := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `

internal/datastore/crdb/crdb.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -473,16 +473,20 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState,
473473
}
474474

475475
func (cds *crdbDatastore) Close() error {
476+
var errs []error
476477
cds.cancel()
478+
if cds.pruneGroup != nil {
479+
errs = append(errs, cds.pruneGroup.Wait())
480+
}
477481
cds.readPool.Close()
478482
cds.writePool.Close()
479483
for _, collector := range cds.collectors {
480484
ok := prometheus.Unregister(collector)
481485
if !ok {
482-
return errors.New("could not unregister collector for CRDB datastore")
486+
errs = append(errs, errors.New("could not unregister collector for CRDB datastore"))
483487
}
484488
}
485-
return nil
489+
return errors.Join(errs...)
486490
}
487491

488492
func (cds *crdbDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {

internal/datastore/dsfortesting/dsfortesting.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"testing"
78
"time"
89

910
sq "github.com/Masterminds/squirrel"
@@ -15,12 +16,14 @@ import (
1516
"github.com/authzed/spicedb/pkg/tuple"
1617
)
1718

18-
// NewMemDBDatastoreForTesting creates a new in-memory datastore for testing.
19+
// NewMemDBDatastoreForTesting creates a new in-memory datastore for testing
20+
// that is automatically closed when the test ends.
1921
// This is a convenience function that wraps the creation of a new MemDB datastore,
2022
// and injects additional proxies for validation at test time.
2123
// NOTE: These additional proxies are not performant for use in production (but then,
2224
// neither is memdb)
2325
func NewMemDBDatastoreForTesting(
26+
t testing.TB,
2427
watchBufferLength uint16,
2528
revisionQuantization,
2629
gcWindow time.Duration,
@@ -29,6 +32,9 @@ func NewMemDBDatastoreForTesting(
2932
if err != nil {
3033
return nil, err
3134
}
35+
t.Cleanup(func() {
36+
_ = ds.Close()
37+
})
3238

3339
return validatingDatastore{ds}, nil
3440
}

internal/datastore/mysql/datastore.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ type Datastore struct {
512512

513513
// Close closes the data store.
514514
func (mds *Datastore) Close() error {
515+
mds.driver.Close(context.Background())
515516
mds.cancelGc()
516517
if mds.gcGroup != nil {
517518
if err := mds.gcGroup.Wait(); err != nil && !errors.Is(err, context.Canceled) {

internal/datastore/proxy/observable_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,20 @@ import (
1010
"github.com/authzed/spicedb/pkg/datastore/test"
1111
)
1212

13-
type observableTest struct{}
13+
type observableTest struct {
14+
t *testing.T
15+
}
1416

1517
func (obs observableTest) New(revisionQuantization, _, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
16-
db, err := dsfortesting.NewMemDBDatastoreForTesting(watchBufferLength, revisionQuantization, gcWindow)
18+
db, err := dsfortesting.NewMemDBDatastoreForTesting(obs.t, watchBufferLength, revisionQuantization, gcWindow)
1719
if err != nil {
1820
return nil, err
1921
}
2022
return NewObservableDatastoreProxy(db), nil
2123
}
2224

2325
func TestObservableProxy(t *testing.T) {
24-
test.All(t, observableTest{}, true)
26+
test.All(t, observableTest{t}, true)
2527
}
2628

2729
func (p *observableProxy) ExampleRetryableError() error {

internal/datastore/proxy/relationshipintegrity_test.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ var expiredKeyForTesting = KeyConfig{
5858
}
5959

6060
func TestWriteWithPredefinedIntegrity(t *testing.T) {
61-
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 5*time.Second, 1*time.Hour)
61+
ds, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 5*time.Second, 1*time.Hour)
6262
require.NoError(t, err)
6363

6464
pds, err := NewRelationshipIntegrityProxy(ds, DefaultKeyForTesting, nil)
@@ -76,7 +76,7 @@ func TestWriteWithPredefinedIntegrity(t *testing.T) {
7676
}
7777

7878
func TestReadWithMissingIntegrity(t *testing.T) {
79-
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 5*time.Second, 1*time.Hour)
79+
ds, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 5*time.Second, 1*time.Hour)
8080
require.NoError(t, err)
8181

8282
// Write a relationship to the underlying datastore without integrity information.
@@ -108,7 +108,7 @@ func TestReadWithMissingIntegrity(t *testing.T) {
108108
}
109109

110110
func TestBasicIntegrityFailureDueToInvalidHashVersion(t *testing.T) {
111-
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 5*time.Second, 1*time.Hour)
111+
ds, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 5*time.Second, 1*time.Hour)
112112
require.NoError(t, err)
113113

114114
pds, err := NewRelationshipIntegrityProxy(ds, DefaultKeyForTesting, nil)
@@ -157,7 +157,7 @@ func TestBasicIntegrityFailureDueToInvalidHashVersion(t *testing.T) {
157157
}
158158

159159
func TestBasicIntegrityFailureDueToInvalidHashSignature(t *testing.T) {
160-
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 5*time.Second, 1*time.Hour)
160+
ds, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 5*time.Second, 1*time.Hour)
161161
require.NoError(t, err)
162162

163163
pds, err := NewRelationshipIntegrityProxy(ds, DefaultKeyForTesting, nil)
@@ -206,7 +206,7 @@ func TestBasicIntegrityFailureDueToInvalidHashSignature(t *testing.T) {
206206
}
207207

208208
func TestBasicIntegrityFailureDueToWriteWithExpiredKey(t *testing.T) {
209-
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 5*time.Second, 1*time.Hour)
209+
ds, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 5*time.Second, 1*time.Hour)
210210
require.NoError(t, err)
211211

212212
// Create a proxy with the to-be-expired key and write some relationships.
@@ -247,11 +247,8 @@ func TestBasicIntegrityFailureDueToWriteWithExpiredKey(t *testing.T) {
247247
func TestWatchIntegrityFailureDueToInvalidHashSignature(t *testing.T) {
248248
t.Parallel()
249249

250-
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 5*time.Second, 1*time.Hour)
250+
ds, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 5*time.Second, 1*time.Hour)
251251
require.NoError(t, err)
252-
t.Cleanup(func() {
253-
_ = ds.Close()
254-
})
255252

256253
headRev, err := ds.HeadRevision(t.Context())
257254
require.NoError(t, err)
@@ -293,7 +290,7 @@ func TestWatchIntegrityFailureDueToInvalidHashSignature(t *testing.T) {
293290
func BenchmarkQueryRelsWithIntegrity(b *testing.B) {
294291
for _, withIntegrity := range []bool{true, false} {
295292
b.Run(fmt.Sprintf("withIntegrity=%t", withIntegrity), func(b *testing.B) {
296-
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 5*time.Second, 1*time.Hour)
293+
ds, err := dsfortesting.NewMemDBDatastoreForTesting(b, 0, 5*time.Second, 1*time.Hour)
297294
require.NoError(b, err)
298295

299296
pds, err := NewRelationshipIntegrityProxy(ds, DefaultKeyForTesting, nil)

internal/datastore/proxy/schemacaching/estimatedsize_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestEstimatedDefinitionSizes(t *testing.T) {
4747
filePath := filePath
4848
t.Run(path.Base(filePath), func(t *testing.T) {
4949
require := require.New(t)
50-
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 1*time.Second, memdb.DisableGC)
50+
ds, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 1*time.Second, memdb.DisableGC)
5151
require.NoError(err)
5252

5353
fullyResolved, _, err := validationfile.PopulateFromFiles(t.Context(), ds, caveattypes.Default.TypeSet, []string{filePath})

internal/datastore/proxy/schemacaching/standardcaching_test.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,13 @@ func TestSnapshotCaching(t *testing.T) {
161161
twoReader.On(tester.readSingleFunctionName, nsA).Return(nil, zero, nil).Once()
162162
twoReader.On(tester.readSingleFunctionName, nsB).Return(nil, one, nil).Once()
163163

164+
dptc := DatastoreProxyTestCache(t)
165+
t.Cleanup(func() {
166+
dptc.Close()
167+
})
168+
164169
require := require.New(t)
165-
ds := NewCachingDatastoreProxy(dsMock, DatastoreProxyTestCache(t), 1*time.Hour, JustInTimeCaching, 100*time.Millisecond)
170+
ds := NewCachingDatastoreProxy(dsMock, dptc, 1*time.Hour, JustInTimeCaching, 100*time.Millisecond)
166171

167172
_, updatedOneA, err := tester.readSingleFunc(t.Context(), ds.SnapshotReader(one), nsA)
168173
require.NoError(err)
@@ -367,7 +372,7 @@ func TestSnapshotCachingRealDatastore(t *testing.T) {
367372
for _, tc := range tcs {
368373
tc := tc
369374
t.Run(tc.name, func(t *testing.T) {
370-
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
375+
rawDS, err := dsfortesting.NewMemDBDatastoreForTesting(t, 0, 0, memdb.DisableGC)
371376
require.NoError(t, err)
372377

373378
ctx := t.Context()
@@ -476,8 +481,13 @@ func TestMixedCaching(t *testing.T) {
476481

477482
dsMock.On("SnapshotReader", one).Return(reader)
478483

484+
dptc := DatastoreProxyTestCache(t)
485+
t.Cleanup(func() {
486+
dptc.Close()
487+
})
488+
479489
require := require.New(t)
480-
ds := NewCachingDatastoreProxy(dsMock, DatastoreProxyTestCache(t), 1*time.Hour, JustInTimeCaching, 100*time.Millisecond)
490+
ds := NewCachingDatastoreProxy(dsMock, dptc, 1*time.Hour, JustInTimeCaching, 100*time.Millisecond)
481491

482492
dsReader := ds.SnapshotReader(one)
483493

@@ -529,6 +539,9 @@ func TestInvalidNamespaceInCache(t *testing.T) {
529539
memoryDatastore, err := memdb.NewMemdbDatastore(0, 1*time.Hour, 1*time.Hour)
530540
require.NoError(err)
531541
ds := NewCachingDatastoreProxy(memoryDatastore, DatastoreProxyTestCache(t), 1*time.Hour, JustInTimeCaching, 100*time.Millisecond)
542+
t.Cleanup(func() {
543+
ds.Close()
544+
})
532545

533546
headRevision, err := ds.HeadRevision(ctx)
534547
require.NoError(err)
@@ -560,6 +573,9 @@ func TestMixedInvalidNamespacesInCache(t *testing.T) {
560573
memoryDatastore, err := memdb.NewMemdbDatastore(0, 1*time.Hour, 1*time.Hour)
561574
require.NoError(err)
562575
ds := NewCachingDatastoreProxy(memoryDatastore, DatastoreProxyTestCache(t), 1*time.Hour, JustInTimeCaching, 100*time.Millisecond)
576+
t.Cleanup(func() {
577+
ds.Close()
578+
})
563579

564580
require.NoError(err)
565581

internal/datastore/proxy/schemacaching/watchingcache.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,9 @@ func (p *watchingCachingProxy) ReadWriteTx(
136136
return p.fallbackCache.ReadWriteTx(ctx, f, opts...)
137137
}
138138

139+
// Start is async so that prepopulating doesn't block the server start.
140+
// The caller must cancel the context before calling Close.
139141
func (p *watchingCachingProxy) Start(ctx context.Context) error {
140-
// Start async so that prepopulating doesn't block the server start.
141142
go func() {
142143
_ = p.startSync(ctx)
143144
}()
@@ -350,6 +351,8 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error {
350351
return nil
351352
}
352353

354+
// Close stops all resources.
355+
// The caller must have canceled the context passed to Start.
353356
func (p *watchingCachingProxy) Close() error {
354357
p.caveatCache.setFallbackMode()
355358
p.namespaceCache.setFallbackMode()

0 commit comments

Comments
 (0)