44package queue
55
66import (
7- "os"
87 "strconv"
98 "sync"
9+ "sync/atomic"
1010 "testing"
1111 "time"
1212
@@ -16,10 +16,7 @@ import (
1616)
1717
1818func TestPersistableChannelUniqueQueue (t * testing.T ) {
19- if os .Getenv ("CI" ) != "" {
20- t .Skip ("Skipping because test is flaky on CI" )
21- }
22-
19+ // Create a temporary directory for the queue
2320 tmpDir := t .TempDir ()
2421 _ = log .NewLogger (1000 , "console" , "console" , `{"level":"warn","stacktracelevel":"NONE","stderr":true}` )
2522
@@ -100,7 +97,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
10097 executedInitial := map [string ][]string {}
10198 hasInitial := map [string ][]string {}
10299
103- fillQueue := func (name string , done chan struct {} ) {
100+ fillQueue := func (name string , done chan int64 ) {
104101 t .Run ("Initial Filling: " + name , func (t * testing.T ) {
105102 lock := sync.Mutex {}
106103
@@ -157,33 +154,39 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
157154 assert .Equal (t , 101 , len (executedInitial [name ])+ len (hasInitial [name ]))
158155 mapLock .Unlock ()
159156 })
157+ mapLock .Lock ()
158+ count := int64 (len (hasInitial [name ]))
159+ mapLock .Unlock ()
160+ done <- count
160161 close (done )
161162 }
162163
163- doneA := make (chan struct {} )
164- doneB := make (chan struct {} )
164+ hasQueueAChan := make (chan int64 )
165+ hasQueueBChan := make (chan int64 )
165166
166- go fillQueue ("QueueA" , doneA )
167- go fillQueue ("QueueB" , doneB )
167+ go fillQueue ("QueueA" , hasQueueAChan )
168+ go fillQueue ("QueueB" , hasQueueBChan )
168169
169- <- doneA
170- <- doneB
170+ hasA := <- hasQueueAChan
171+ hasB := <- hasQueueBChan
171172
172173 executedEmpty := map [string ][]string {}
173174 hasEmpty := map [string ][]string {}
174- emptyQueue := func (name string , done chan struct {}) {
175+ emptyQueue := func (name string , numInQueue int64 , done chan struct {}) {
175176 t .Run ("Empty Queue: " + name , func (t * testing.T ) {
176177 lock := sync.Mutex {}
177178 stop := make (chan struct {})
178179
179180 // collect the tasks that have been executed
181+ atomicCount := int64 (0 )
180182 handle := func (data ... Data ) []Data {
181183 lock .Lock ()
182184 for _ , datum := range data {
183185 mapLock .Lock ()
184186 executedEmpty [name ] = append (executedEmpty [name ], datum .(string ))
185187 mapLock .Unlock ()
186- if datum .(string ) == "final" {
188+ count := atomic .AddInt64 (& atomicCount , 1 )
189+ if count >= numInQueue {
187190 close (stop )
188191 }
189192 }
@@ -217,11 +220,11 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
217220 close (done )
218221 }
219222
220- doneA = make (chan struct {})
221- doneB = make (chan struct {})
223+ doneA : = make (chan struct {})
224+ doneB : = make (chan struct {})
222225
223- go emptyQueue ("QueueA" , doneA )
224- go emptyQueue ("QueueB" , doneB )
226+ go emptyQueue ("QueueA" , hasA , doneA )
227+ go emptyQueue ("QueueB" , hasB , doneB )
225228
226229 <- doneA
227230 <- doneB
@@ -237,20 +240,20 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
237240 hasEmpty = map [string ][]string {}
238241 mapLock .Unlock ()
239242
240- doneA = make (chan struct {} )
241- doneB = make (chan struct {} )
243+ hasQueueAChan = make (chan int64 )
244+ hasQueueBChan = make (chan int64 )
242245
243- go fillQueue ("QueueA" , doneA )
244- go fillQueue ("QueueB" , doneB )
246+ go fillQueue ("QueueA" , hasQueueAChan )
247+ go fillQueue ("QueueB" , hasQueueBChan )
245248
246- <- doneA
247- <- doneB
249+ hasA = <- hasQueueAChan
250+ hasB = <- hasQueueBChan
248251
249252 doneA = make (chan struct {})
250253 doneB = make (chan struct {})
251254
252- go emptyQueue ("QueueA" , doneA )
253- go emptyQueue ("QueueB" , doneB )
255+ go emptyQueue ("QueueA" , hasA , doneA )
256+ go emptyQueue ("QueueB" , hasB , doneB )
254257
255258 <- doneA
256259 <- doneB
0 commit comments