Skip to content

Commit 9d6d2d0

Browse files
committed
Make test deterministic.
1 parent 69ca33a commit 9d6d2d0

File tree

5 files changed

+117
-64
lines changed

5 files changed

+117
-64
lines changed

pkg/ingester/ingester_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ import (
6464
asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
6565
"github.com/grafana/mimir/pkg/ingester/client"
6666
"github.com/grafana/mimir/pkg/mimirpb"
67+
mimirpbtest "github.com/grafana/mimir/pkg/mimirpb/testutil"
6768
"github.com/grafana/mimir/pkg/storage/chunk"
6869
"github.com/grafana/mimir/pkg/storage/sharding"
6970
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
@@ -9156,16 +9157,19 @@ func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) {
91569157
err = mimirpb.Unmarshal(buf, &wr)
91579158
require.NoError(t, err)
91589159

9159-
nextLeak := mimirpb.NextRefLeakChannel(t.Context())
9160+
nextLeak := mimirpbtest.NextRefLeakCheck(t.Context(), wr.Buffer().ReadOnlyData())
91609161

91619162
ctx := user.InjectOrgID(context.Background(), userID)
91629163
_, err = ing.Push(ctx, &wr.WriteRequest)
91639164
require.NoError(t, err)
91649165

91659166
select {
9166-
case addr := <-nextLeak:
9167-
require.Fail(t, "reference leak detected", "addr: %x", addr)
9168-
case <-time.After(10 * time.Millisecond):
9167+
case leaked := <-nextLeak:
9168+
if leaked {
9169+
require.Fail(t, "reference leak detected", "addr: %p", buf)
9170+
}
9171+
case <-time.After(1 * time.Second):
9172+
require.Fail(t, "expected reference leak check", "addr: %p", buf)
91699173
}
91709174
}
91719175

pkg/mimirpb/custom.go

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package mimirpb
44

55
import (
66
"bytes"
7-
"context"
87
"fmt"
98
"math"
109
"runtime"
@@ -13,6 +12,7 @@ import (
1312
"weak"
1413

1514
gogoproto "github.com/gogo/protobuf/proto"
15+
"github.com/grafana/mimir/pkg/mimirpb/internal"
1616
"github.com/prometheus/prometheus/model/histogram"
1717
"go.uber.org/atomic"
1818
"google.golang.org/grpc/encoding"
@@ -215,6 +215,10 @@ type BufferHolder struct {
215215
buffer mem.Buffer
216216
}
217217

218+
func (m *BufferHolder) Buffer() mem.Buffer {
219+
return m.buffer
220+
}
221+
218222
func (m *BufferHolder) SetBuffer(buf mem.Buffer) {
219223
m.buffer = buf
220224
}
@@ -252,46 +256,29 @@ func (b *instrumentLeaksBuf) Free() {
252256
}
253257
// Remove our ref; no (strong) refs should remain (except in the stack).
254258
b.Buffer = nil
255-
weakRef := weak.Make(unsafe.SliceData(buf))
259+
ptr := unsafe.SliceData(buf)
260+
addr := uintptr(unsafe.Pointer(ptr))
261+
weakRef := weak.Make(ptr)
256262

257263
// Check in a separate goroutine, because this stack still has locals
258264
// pointing to the buffer.
259265
go func() {
260266
runtime.GC()
261267
runtime.GC()
262268

263-
if v := weakRef.Value(); v != nil {
264-
select {
265-
case leaks <- v:
266-
default:
267-
panic(fmt.Sprintf("reference leak for object %p", v))
269+
leaked := weakRef.Value() != nil
270+
271+
select {
272+
case internal.RefLeakChecks <- internal.RefLeakCheck{Addr: addr, Leaked: leaked}:
273+
default:
274+
if leaked {
275+
panic(fmt.Sprintf("reference leak for object 0x%x", addr))
268276
}
269277
}
270278
}()
271279
}
272280
}
273281

274-
var leaks = make(chan *byte)
275-
276-
// NextRefLeakChannel returns a channel where the next detected reference leak's
277-
// address will be sent, instead of the default behavior of panicking.
278-
//
279-
// Canceling the context immediately closes the channel and restores the default
280-
// behavior.
281-
//
282-
// Intended for use in tests.
283-
func NextRefLeakChannel(ctx context.Context) <-chan uintptr {
284-
ch := make(chan uintptr, 1)
285-
go func() {
286-
defer close(ch)
287-
select {
288-
case ch <- uintptr(unsafe.Pointer(<-leaks)):
289-
case <-ctx.Done():
290-
}
291-
}()
292-
return ch
293-
}
294-
295282
// MinTimestamp returns the minimum timestamp (milliseconds) among all series
296283
// in the WriteRequest. Returns math.MaxInt64 if the request is empty.
297284
func (m *WriteRequest) MinTimestamp() int64 {

pkg/mimirpb/custom_test.go

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package mimirpb
88
import (
99
"testing"
1010
"time"
11-
"unsafe"
1211

1312
"github.com/prometheus/prometheus/model/histogram"
1413
"github.com/stretchr/testify/assert"
@@ -17,6 +16,7 @@ import (
1716
"google.golang.org/grpc/encoding/proto"
1817
"google.golang.org/grpc/mem"
1918

19+
"github.com/grafana/mimir/pkg/mimirpb/internal"
2020
"github.com/grafana/mimir/pkg/util/test"
2121
)
2222

@@ -208,7 +208,7 @@ func TestCodecV2_Unmarshal(t *testing.T) {
208208

209209
require.True(t, origReq.Equal(req))
210210

211-
require.NotNil(t, req.buffer)
211+
require.NotNil(t, req.Buffer)
212212
req.FreeBuffer()
213213
}
214214

@@ -258,44 +258,36 @@ func TestInstrumentRefLeaks(t *testing.T) {
258258
buf, err := src.Marshal()
259259
require.NoError(t, err)
260260

261-
var req WriteRequest
262-
err = Unmarshal(buf, &req)
263-
require.NoError(t, err)
264-
265-
bufAddr := uintptr(unsafe.Pointer(unsafe.SliceData(req.buffer.ReadOnlyData())))
266-
// Label names are UnsafeMutableStrings pointing to buf. They shouldn't outlive
267-
// the call to req.FreeBuffer.
268-
leakingLabelName := req.Timeseries[0].Labels[0].Name
269-
270-
leaks := NextRefLeakChannel(t.Context())
261+
var leaks <-chan bool
262+
var leakingLabelName UnsafeMutableString
263+
func() {
264+
var req WriteRequest
265+
err = Unmarshal(buf, &req)
266+
require.NoError(t, err)
271267

272-
recvLeak := func() (uintptr, bool) {
273-
select {
274-
case detectedAddr := <-leaks:
275-
return detectedAddr, true
276-
case <-time.After(10 * time.Millisecond):
277-
return 0, false
278-
}
279-
}
268+
// Label names are UnsafeMutableStrings pointing to buf. They shouldn't outlive
269+
// the call to req.FreeBuffer.
270+
leakingLabelName = req.Timeseries[0].Labels[0].Name
280271

281-
req.FreeBuffer() // leakingLabelName becomes a leak here
272+
leaks = internal.NextRefLeakCheck(t.Context(), req.Buffer.ReadOnlyData())
273+
req.FreeBuffer() // leakingLabelName becomes a leak here
274+
}()
282275

283276
// Expect to receive a leak detection.
284-
detectedAddr, ok := recvLeak()
285-
require.True(t, ok, "expected a reference leak")
286-
require.Equal(t, bufAddr, detectedAddr)
277+
require.Eventually(t, func() bool { return <-leaks }, 10*time.Millisecond, 1*time.Second, "expected a reference leak")
287278
// Expect the label name contents to have been replaced with a taint word.
288279
// Keep this check last, because we need to extend the lifespan of leakingLabelName
289280
// to avoid Go optimizing the leak away.
290281
require.Contains(t, leakingLabelName, "KAEL")
291282

292283
// Now let's check a non-leak doesn't get falsely detected.
293-
dataNoLeak, err := src.Marshal()
294-
require.NoError(t, err)
295-
var reqNoLeak WriteRequest
296-
err = Unmarshal(dataNoLeak, &reqNoLeak)
297-
require.NoError(t, err)
298-
leaks = NextRefLeakChannel(t.Context())
299-
detectedAddr, ok = recvLeak()
300-
require.False(t, ok, "unexpected a reference leak %x", detectedAddr)
284+
func() {
285+
var reqNoLeak WriteRequest
286+
err = Unmarshal(buf, &reqNoLeak)
287+
require.NoError(t, err)
288+
289+
leaks = internal.NextRefLeakCheck(t.Context(), reqNoLeak.Buffer.ReadOnlyData())
290+
reqNoLeak.FreeBuffer()
291+
}()
292+
require.Eventually(t, func() bool { return !<-leaks }, 10*time.Millisecond, 1*time.Second, "expected no reference leaks")
301293
}

pkg/mimirpb/internal/refleaks.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"testing"
7+
)
8+
9+
type RefLeakCheck struct {
10+
Addr uintptr
11+
Leaked bool
12+
}
13+
14+
var RefLeakChecks chan RefLeakCheck
15+
16+
func init() {
17+
if testing.Testing() {
18+
RefLeakChecks = make(chan RefLeakCheck)
19+
}
20+
}
21+
22+
// NextRefLeakCheck is documented in testutil.NextRefLeakCheck.
23+
func NextRefLeakCheck(ctx context.Context, object any) <-chan bool {
24+
if !testing.Testing() {
25+
panic("NextRefLeakCheck can only be used in tests")
26+
}
27+
var addr uintptr
28+
switch o := reflect.ValueOf(object); o.Type().Kind() {
29+
case reflect.Pointer, reflect.Slice, reflect.String:
30+
addr = uintptr(o.UnsafePointer())
31+
}
32+
33+
ch := make(chan bool, 1)
34+
go func() {
35+
defer close(ch)
36+
for {
37+
select {
38+
case leak := <-RefLeakChecks:
39+
if leak.Addr == addr {
40+
ch <- leak.Leaked
41+
return
42+
}
43+
case <-ctx.Done():
44+
}
45+
}
46+
}()
47+
return ch
48+
}

pkg/mimirpb/testutil/refleaks.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package testutil
2+
3+
import (
4+
"context"
5+
6+
"github.com/grafana/mimir/pkg/mimirpb/internal"
7+
)
8+
9+
// NextRefLeakChannel returns a channel where the next check for reference leaks
10+
// to the given object will be reported, whether a leak is detected
11+
// or not. It replaces the default behavior of panicking when a leak is detected.
12+
//
13+
// The object must be identified with a reference value: a pointer, a slice, or
14+
// a string.
15+
//
16+
// Canceling the context immediately closes the channel and restores the default
17+
// behavior.
18+
//
19+
// Only works when [testing.Testing].
20+
func NextRefLeakCheck(ctx context.Context, object any) <-chan bool {
21+
return internal.NextRefLeakCheck(ctx, object)
22+
}

0 commit comments

Comments
 (0)