11package engine
22
33import (
4+ "context"
45 "fmt"
56 "sync"
67 "testing"
7- "time "
8+ "testing/synctest "
89
9- "github.com/stretchr/testify/require "
10+ "github.com/stretchr/testify/assert "
1011 "go.uber.org/atomic"
1112)
1213
@@ -17,8 +18,8 @@ func TestNotifier_PassByValue(t *testing.T) {
1718
1819 var sent sync.WaitGroup
1920 sent .Add (1 )
20- go func (n Notifier ) {
21- notifier .Notify ()
21+ go func (passedByValue Notifier ) {
22+ passedByValue .Notify ()
2223 sent .Done ()
2324 }(notifier )
2425 sent .Wait ()
@@ -38,7 +39,7 @@ func TestNotifier_NoNotificationsInitialization(t *testing.T) {
3839 select {
3940 case <- notifier .Channel ():
4041 t .Fail ()
41- default : //expected
42+ default : // expected
4243 }
4344}
4445
@@ -52,12 +53,10 @@ func TestNotifier_ManyNotifications(t *testing.T) {
5253 notifier := NewNotifier ()
5354
5455 var counter sync.WaitGroup
55- for i := 0 ; i < 10 ; i ++ {
56- counter .Add (1 )
57- go func () {
56+ for range 10 {
57+ counter .Go (func () {
5858 notifier .Notify ()
59- counter .Done ()
60- }()
59+ })
6160 }
6261 counter .Wait ()
6362
@@ -67,141 +66,132 @@ func TestNotifier_ManyNotifications(t *testing.T) {
6766 select {
6867 case <- c : // expected
6968 default :
70- t .Fail ( )
69+ t .Error ( "expected one notification to be available" )
7170 }
7271
73- // attempt to consume first notification
72+ // attempt to consume second notification
7473 // expect that no notification is available
7574 select {
7675 case <- c :
77- t .Fail ( )
78- default : //expected
76+ t .Error ( "expected only one notification to be available" )
77+ default : // expected
7978 }
8079}
8180
82- // TestNotifier_ManyConsumers spans many worker routines and
83- // sends just as many notifications with small delays. We require that
84- // all workers eventually get a notification.
81+ // TestNotifier_ManyConsumers spans many worker routines and sends just as many notifications.
82+ // We require that all workers eventually get a notification.
8583func TestNotifier_ManyConsumers (t * testing.T ) {
8684 singleTestRun := func (t * testing.T ) {
8785 t .Parallel ()
88- notifier := NewNotifier ()
89- c := notifier .Channel ()
90-
91- // spawn 100 worker routines to each wait for a notification
92- var startingWorkers sync.WaitGroup
93- pendingWorkers := atomic .NewInt32 (100 )
94- for i := 0 ; i < 100 ; i ++ {
95- startingWorkers .Add (1 )
96- go func () {
97- startingWorkers .Done ()
98- <- c
99- pendingWorkers .Dec ()
100- }()
101- }
102- startingWorkers .Wait ()
103-
104- // send 100 notifications, with small delays
105- for i := 0 ; i < 100 ; i ++ {
106- notifier .Notify ()
107- time .Sleep (100 * time .Millisecond )
108- }
86+ synctest .Test (t , func (t * testing.T ) {
87+ notifier := NewNotifier ()
88+ c := notifier .Channel ()
89+
90+ // spawn 100 worker routines to each wait for a notification
91+ pendingWorkers := atomic .NewInt32 (100 )
92+ for range 100 {
93+ go func () {
94+ <- c
95+ pendingWorkers .Dec ()
96+ }()
97+ }
98+
99+ // wait until all workers are blocked on the notifier channel
100+ synctest .Wait ()
101+
102+ for range 100 {
103+ notifier .Notify ()
104+
105+ // wait for the previous notification to be consumed to ensure that the producer
106+ // won't drop any notifications.
107+ // NOTE: this is necessary because golang channels do not provide atomic consistency.
108+ // Specifically, it means that when we send a notification while workers are waiting
109+ // it is not guaranteed that a worker will atomically receive that notification. In
110+ // other words, the channel might behave as if there was no worker waiting and de-
111+ // duplicating notifications. For details, see
112+ // https://www.notion.so/flowfoundation/Golang-Channel-Consistency-19a1aee12324817699b1ff162921d8fc
113+ synctest .Wait ()
114+ }
115+
116+ // wait until all workers are done
117+ synctest .Wait ()
109118
110- // require that all workers got a notification
111- if ! conditionEventuallySatisfied (func () bool { return pendingWorkers .Load () == 0 }, 3 * time .Second , 100 * time .Millisecond ) {
112- require .Fail (t , "timed out" , "still awaiting %d workers to get notification" , pendingWorkers .Load ())
113- }
119+ // require that all workers got a notification
120+ assert .Equal (t , int32 (0 ), pendingWorkers .Load ())
121+ })
114122 }
115123
116- for r := 0 ; r < 100 ; r ++ {
124+ for r := range 100 {
117125 t .Run (fmt .Sprintf ("run %d" , r ), singleTestRun )
118126 }
119127}
120128
121129// TestNotifier_AllWorkProcessed spans many routines pushing work and fewer
122- // routines consuming work. We require that all worker is eventually processed.
130+ // routines consuming work. We require that all work is eventually processed.
123131func TestNotifier_AllWorkProcessed (t * testing.T ) {
124132 singleTestRun := func (t * testing.T ) {
125133 t .Parallel ()
126- notifier := NewNotifier ()
127-
128- totalWork := int32 (100 )
129- pendingWorkQueue := make (chan struct {}, totalWork )
130- scheduledWork := atomic .NewInt32 (0 )
131- consumedWork := atomic .NewInt32 (0 )
132-
133- // starts the consuming first, because if we starts the production first instead, then
134- // we might finish pushing all jobs, before any of our consumer has started listening
135- // to the queue.
136- var consumersAllReady sync.WaitGroup
137- consumersAllReady .Add (5 )
138-
139- // 5 routines consuming work
140- for i := 0 ; i < 5 ; i ++ {
141- go func () {
142- consumersAllReady .Done ()
143- for consumedWork .Load () < totalWork {
144- <- notifier .Channel ()
145- L:
134+ synctest .Test (t , func (t * testing.T ) {
135+ ctx , cancel := context .WithCancel (context .Background ())
136+
137+ notifier := NewNotifier ()
138+
139+ producerCount := int32 (10 ) // number of producers
140+ producerJobs := int32 (10 ) // number of tasks that each producer will queue up
141+ pendingWorkQueue := make (chan struct {}, producerCount * producerJobs )
142+ consumedWork := atomic .NewInt32 (0 )
143+
144+ // start the consumers first, otherwise we might finish pushing all jobs, before any of
145+ // our consumer has started listening to the queue.
146+
147+ processAllPending := func () {
148+ for {
149+ select {
150+ case <- pendingWorkQueue :
151+ consumedWork .Inc ()
152+ default :
153+ return
154+ }
155+ }
156+ }
157+
158+ // 5 routines consuming work
159+ for range 5 {
160+ go func () {
146161 for {
147162 select {
148- case <- pendingWorkQueue :
149- consumedWork . Inc ()
150- default :
151- break L
163+ case <- ctx . Done () :
164+ return
165+ case <- notifier . Channel () :
166+ processAllPending ()
152167 }
153168 }
154- }
155- }()
156- }
157-
158- // wait long enough for all consumer to be ready for new notification.
159- consumersAllReady .Wait ()
160-
161- var workersAllReady sync.WaitGroup
162- workersAllReady .Add (10 )
163-
164- // 10 routines pushing work
165- for i := 0 ; i < 10 ; i ++ {
166- go func () {
167- workersAllReady .Done ()
168- for scheduledWork .Inc () <= totalWork {
169- pendingWorkQueue <- struct {}{}
170- notifier .Notify ()
171- }
172- }()
173- }
174-
175- // wait long enough for all workers to be started.
176- workersAllReady .Wait ()
169+ }()
170+ }
177171
178- // require that all work is eventually consumed
179- if ! conditionEventuallySatisfied (func () bool { return consumedWork .Load () == totalWork }, 3 * time .Second , 100 * time .Millisecond ) {
180- require .Fail (t , "timed out" , "only consumed %d units of work but expecting %d" , consumedWork .Load (), totalWork )
181- }
182- }
172+ // wait for all consumers to block on the notifier channel
173+ synctest .Wait ()
183174
184- for r := 0 ; r < 100 ; r ++ {
185- t .Run (fmt .Sprintf ("run %d" , r ), singleTestRun )
186- }
187- }
175+ for range producerCount {
176+ go func () {
177+ for range producerJobs {
178+ pendingWorkQueue <- struct {}{}
179+ notifier .Notify ()
180+ }
181+ }()
182+ }
188183
189- func conditionEventuallySatisfied (condition func () bool , waitFor time.Duration , tick time.Duration ) bool {
190- done := make (chan struct {})
184+ // wait for all producers and consumers to block. at this point, all jobs should be consumed.
185+ synctest .Wait ()
186+ assert .Equal (t , producerCount * producerJobs , consumedWork .Load ())
191187
192- go func () {
193- for range time .Tick (tick ) {
194- if condition () {
195- close (done )
196- return
197- }
198- }
199- }()
188+ // shutdown blocked consumers and wait for them to complete
189+ cancel ()
190+ synctest .Wait ()
191+ })
192+ }
200193
201- select {
202- case <- time .After (waitFor ):
203- return false
204- case <- done :
205- return true
194+ for r := range 100 {
195+ t .Run (fmt .Sprintf ("run %d" , r ), singleTestRun )
206196 }
207197}
0 commit comments