Skip to content

Commit 661ba2c

Browse files
committed
check start waiters with unsafe semaphore inspection.
1 parent e498f10 commit 661ba2c

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,11 @@ func TestWriterReconnector_Write_QueueLimit(t *testing.T) {
271271
))
272272
w.firstConnectionHandled.Store(true)
273273

274-
waitStartQueueWait := func() {
275-
// TODO: fix with reflection and unsafe for count internal waiters
276-
time.Sleep(time.Second / 10)
274+
waitStartQueueWait := func(targetWaiters int) {
275+
xtest.SpinWaitCondition(t, nil, func() bool {
276+
res := getWaitersCount(w.semaphore) == targetWaiters
277+
return res
278+
})
277279
}
278280

279281
err := w.Write(ctx, newTestMessages(1, 2))
@@ -282,7 +284,7 @@ func TestWriterReconnector_Write_QueueLimit(t *testing.T) {
282284
ctxNoQueueSpace, ctxNoQueueSpaceCancel := context.WithCancel(ctx)
283285

284286
go func() {
285-
waitStartQueueWait()
287+
waitStartQueueWait(1)
286288
ctxNoQueueSpaceCancel()
287289
}()
288290
err = w.Write(ctxNoQueueSpace, newTestMessages(3))
@@ -291,7 +293,7 @@ func TestWriterReconnector_Write_QueueLimit(t *testing.T) {
291293
}
292294

293295
go func() {
294-
waitStartQueueWait()
296+
waitStartQueueWait(1)
295297
ackErr := w.queue.AcksReceived([]rawtopicwriter.WriteAck{
296298
{
297299
SeqNo: 1,
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package topicwriterinternal
2+
3+
import (
4+
"container/list"
5+
"reflect"
6+
"runtime"
7+
"sync"
8+
"unsafe"
9+
10+
"golang.org/x/sync/semaphore"
11+
)
12+
13+
func getWaitersCount(sem *semaphore.Weighted) int {
14+
defer runtime.KeepAlive(sem)
15+
16+
semVal := reflect.ValueOf(sem).Elem()
17+
mutexField := semVal.FieldByName("mu")
18+
19+
mutexAddr := unsafe.Pointer(mutexField.UnsafeAddr())
20+
mutex := (*sync.Mutex)(mutexAddr)
21+
mutex.Lock()
22+
defer mutex.Unlock()
23+
24+
waitersField := semVal.FieldByName("waiters")
25+
waitersPointer := unsafe.Pointer(waitersField.UnsafeAddr())
26+
waiters := (*list.List)(waitersPointer)
27+
return waiters.Len()
28+
}

0 commit comments

Comments
 (0)