@@ -263,47 +263,46 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
263263}
264264
265265func TestWriterReconnector_Write_QueueLimit (t * testing.T ) {
266- ctx := xtest .Context (t )
267- w := newWriterReconnectorStopped (newWriterReconnectorConfig (
268- WithAutoSetSeqNo (false ),
269- WithMaxQueueLen (2 ),
270- ))
271- w .firstConnectionHandled .Store (true )
272-
273- waitStartQueueWait := func () {
274- xtest .SpinWaitCondition (t , nil , func () bool {
275- if w .semaphore .TryAcquire (1 ) {
276- w .semaphore .Release (1 )
277- return false
278- }
279- return true
280- })
281- }
266+ xtest .TestManyTimes (t , func (t testing.TB ) {
267+ ctx := xtest .Context (t )
268+ w := newWriterReconnectorStopped (newWriterReconnectorConfig (
269+ WithAutoSetSeqNo (false ),
270+ WithMaxQueueLen (2 ),
271+ ))
272+ w .firstConnectionHandled .Store (true )
282273
283- err := w .Write (ctx , newTestMessages (1 , 2 ))
284- require .NoError (t , err )
274+ waitStartQueueWait := func () {
275+ // TODO: fix with reflection and unsafe for count internal waiters
276+ time .Sleep (time .Second / 10 )
277+ }
285278
286- ctxNoQueueSpace , ctxNoQueueSpaceCancel := context .WithCancel (ctx )
279+ err := w .Write (ctx , newTestMessages (1 , 2 ))
280+ require .NoError (t , err )
287281
288- go func () {
289- waitStartQueueWait ()
290- ctxNoQueueSpaceCancel ()
291- }()
292- err = w .Write (ctxNoQueueSpace , newTestMessages (3 ))
293- require .ErrorIs (t , err , PublicErrQueueIsFull )
282+ ctxNoQueueSpace , ctxNoQueueSpaceCancel := context .WithCancel (ctx )
294283
295- go func () {
296- waitStartQueueWait ()
297- ackErr := w .queue .AcksReceived ([]rawtopicwriter.WriteAck {
298- {
299- SeqNo : 1 ,
300- },
301- })
302- require .NoError (t , ackErr )
303- }()
284+ go func () {
285+ waitStartQueueWait ()
286+ ctxNoQueueSpaceCancel ()
287+ }()
288+ err = w .Write (ctxNoQueueSpace , newTestMessages (3 ))
289+ if ! errors .Is (err , PublicErrQueueIsFull ) {
290+ require .ErrorIs (t , err , PublicErrQueueIsFull )
291+ }
292+
293+ go func () {
294+ waitStartQueueWait ()
295+ ackErr := w .queue .AcksReceived ([]rawtopicwriter.WriteAck {
296+ {
297+ SeqNo : 1 ,
298+ },
299+ })
300+ require .NoError (t , ackErr )
301+ }()
304302
305- err = w .Write (ctx , newTestMessages (3 ))
306- require .NoError (t , err )
303+ err = w .Write (ctx , newTestMessages (3 ))
304+ require .NoError (t , err )
305+ })
307306}
308307
309308func TestEnv (t * testing.T ) {
0 commit comments