Skip to content

Commit 07cff18

Browse files
authored
Merge pull request #405 fix flaky test TestWriterReconnector_Write_QueueLimit
2 parents 3bf7318 + 661ba2c commit 07cff18

File tree

2 files changed

+65
-36
lines changed

2 files changed

+65
-36
lines changed

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -263,47 +263,48 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
263263
}
264264

265265
func TestWriterReconnector_Write_QueueLimit(t *testing.T) {
266-
ctx := xtest.Context(t)
267-
w := newWriterReconnectorStopped(newWriterReconnectorConfig(
268-
WithAutoSetSeqNo(false),
269-
WithMaxQueueLen(2),
270-
))
271-
w.firstConnectionHandled.Store(true)
272-
273-
waitStartQueueWait := func() {
274-
xtest.SpinWaitCondition(t, nil, func() bool {
275-
if w.semaphore.TryAcquire(1) {
276-
w.semaphore.Release(1)
277-
return false
278-
}
279-
return true
280-
})
281-
}
266+
xtest.TestManyTimes(t, func(t testing.TB) {
267+
ctx := xtest.Context(t)
268+
w := newWriterReconnectorStopped(newWriterReconnectorConfig(
269+
WithAutoSetSeqNo(false),
270+
WithMaxQueueLen(2),
271+
))
272+
w.firstConnectionHandled.Store(true)
282273

283-
err := w.Write(ctx, newTestMessages(1, 2))
284-
require.NoError(t, err)
274+
waitStartQueueWait := func(targetWaiters int) {
275+
xtest.SpinWaitCondition(t, nil, func() bool {
276+
res := getWaitersCount(w.semaphore) == targetWaiters
277+
return res
278+
})
279+
}
285280

286-
ctxNoQueueSpace, ctxNoQueueSpaceCancel := context.WithCancel(ctx)
281+
err := w.Write(ctx, newTestMessages(1, 2))
282+
require.NoError(t, err)
287283

288-
go func() {
289-
waitStartQueueWait()
290-
ctxNoQueueSpaceCancel()
291-
}()
292-
err = w.Write(ctxNoQueueSpace, newTestMessages(3))
293-
require.ErrorIs(t, err, PublicErrQueueIsFull)
284+
ctxNoQueueSpace, ctxNoQueueSpaceCancel := context.WithCancel(ctx)
294285

295-
go func() {
296-
waitStartQueueWait()
297-
ackErr := w.queue.AcksReceived([]rawtopicwriter.WriteAck{
298-
{
299-
SeqNo: 1,
300-
},
301-
})
302-
require.NoError(t, ackErr)
303-
}()
286+
go func() {
287+
waitStartQueueWait(1)
288+
ctxNoQueueSpaceCancel()
289+
}()
290+
err = w.Write(ctxNoQueueSpace, newTestMessages(3))
291+
if !errors.Is(err, PublicErrQueueIsFull) {
292+
require.ErrorIs(t, err, PublicErrQueueIsFull)
293+
}
294+
295+
go func() {
296+
waitStartQueueWait(1)
297+
ackErr := w.queue.AcksReceived([]rawtopicwriter.WriteAck{
298+
{
299+
SeqNo: 1,
300+
},
301+
})
302+
require.NoError(t, ackErr)
303+
}()
304304

305-
err = w.Write(ctx, newTestMessages(3))
306-
require.NoError(t, err)
305+
err = w.Write(ctx, newTestMessages(3))
306+
require.NoError(t, err)
307+
})
307308
}
308309

309310
func TestEnv(t *testing.T) {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package topicwriterinternal
2+
3+
import (
4+
"container/list"
5+
"reflect"
6+
"runtime"
7+
"sync"
8+
"unsafe"
9+
10+
"golang.org/x/sync/semaphore"
11+
)
12+
13+
func getWaitersCount(sem *semaphore.Weighted) int {
14+
defer runtime.KeepAlive(sem)
15+
16+
semVal := reflect.ValueOf(sem).Elem()
17+
mutexField := semVal.FieldByName("mu")
18+
19+
mutexAddr := unsafe.Pointer(mutexField.UnsafeAddr())
20+
mutex := (*sync.Mutex)(mutexAddr)
21+
mutex.Lock()
22+
defer mutex.Unlock()
23+
24+
waitersField := semVal.FieldByName("waiters")
25+
waitersPointer := unsafe.Pointer(waitersField.UnsafeAddr())
26+
waiters := (*list.List)(waitersPointer)
27+
return waiters.Len()
28+
}

0 commit comments

Comments
 (0)