Skip to content

Commit 2684d76

Browse files
authored
Fixing flaky-tests in matching (#7213)
Multiple tests in matching were flaky because of race-condition: - we had to wait for engine.Stop() to finish since it calls finilizers we're checking with mocks - but we were calling it in goroutine w/o waiting Instead, it is closer now to how code runs it - runMembershipChangeLoop() is in goroutine, .Stop is called synchronously. Also renamed `subscribeToMembershipChanges` to `runMembershipChangeLoop` to emphasize it is blocking a loop, not just "subscription". In addition, removed time.Sleep - so now tests run for few ms, instead of 3+ seconds. Removed `TestMembershipSubscriptionShutdown` since it was 1:1 copying `TestSubscriptionAndShutdown`.
1 parent 46dd3f5 commit 2684d76

File tree

3 files changed

+57
-89
lines changed

3 files changed

+57
-89
lines changed

service/matching/handler/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func NewEngine(
157157
}
158158

159159
e.shutdownCompletion.Add(1)
160-
go e.subscribeToMembershipChanges()
160+
go e.runMembershipChangeLoop()
161161

162162
return e
163163
}

service/matching/handler/membership.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ const subscriptionBufferSize = 1000
4444
// which host is the real owner of the tasklist.
4545
//
4646
// This is not the main shutdown process, its just an optimization.
47-
func (e *matchingEngineImpl) subscribeToMembershipChanges() {
47+
func (e *matchingEngineImpl) runMembershipChangeLoop() {
4848
defer func() {
4949
if r := recover(); r != nil {
5050
e.logger.Error("matching membership watcher changes caused a panic, recovering", tag.Dynamic("recovered-panic", r))

service/matching/handler/membership_test.go

Lines changed: 55 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -152,31 +152,7 @@ func TestGetTaskListManager_OwnerShip(t *testing.T) {
152152
}
153153
}
154154

155-
func TestMembershipSubscriptionShutdown(t *testing.T) {
156-
assert.NotPanics(t, func() {
157-
ctrl := gomock.NewController(t)
158-
m := membership.NewMockResolver(ctrl)
159-
160-
m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Times(1)
161-
162-
e := matchingEngineImpl{
163-
membershipResolver: m,
164-
config: &config.Config{
165-
EnableTasklistOwnershipGuard: func(opts ...dynamicproperties.FilterOption) bool { return true },
166-
},
167-
shutdown: make(chan struct{}),
168-
logger: log.NewNoop(),
169-
}
170-
171-
go func() {
172-
time.Sleep(time.Second)
173-
close(e.shutdown)
174-
}()
175-
e.subscribeToMembershipChanges()
176-
})
177-
}
178-
179-
func TestMembershipSubscriptionPanicHandling(t *testing.T) {
155+
func TestMembershipSubscriptionRecoversAfterPanic(t *testing.T) {
180156
assert.NotPanics(t, func() {
181157
ctrl := gomock.NewController(t)
182158

@@ -185,7 +161,7 @@ func TestMembershipSubscriptionPanicHandling(t *testing.T) {
185161
panic("a panic has occurred")
186162
})
187163

188-
e := matchingEngineImpl{
164+
engine := matchingEngineImpl{
189165
membershipResolver: r.MembershipResolver,
190166
config: &config.Config{
191167
EnableTasklistOwnershipGuard: func(opts ...dynamicproperties.FilterOption) bool { return true },
@@ -194,22 +170,22 @@ func TestMembershipSubscriptionPanicHandling(t *testing.T) {
194170
shutdown: make(chan struct{}),
195171
}
196172

197-
e.subscribeToMembershipChanges()
173+
engine.runMembershipChangeLoop()
198174
})
199175
}
200176

201177
func TestSubscriptionAndShutdown(t *testing.T) {
202178
ctrl := gomock.NewController(t)
203-
m := membership.NewMockResolver(ctrl)
179+
mockResolver := membership.NewMockResolver(ctrl)
204180

205-
shutdownWG := &sync.WaitGroup{}
181+
shutdownWG := sync.WaitGroup{}
206182
shutdownWG.Add(1)
207183

208184
mockDomainCache := cache.NewMockDomainCache(ctrl)
209185

210-
e := matchingEngineImpl{
211-
shutdownCompletion: shutdownWG,
212-
membershipResolver: m,
186+
engine := matchingEngineImpl{
187+
shutdownCompletion: &shutdownWG,
188+
membershipResolver: mockResolver,
213189
config: &config.Config{
214190
EnableTasklistOwnershipGuard: func(opts ...dynamicproperties.FilterOption) bool { return true },
215191
},
@@ -218,40 +194,31 @@ func TestSubscriptionAndShutdown(t *testing.T) {
218194
domainCache: mockDomainCache,
219195
}
220196

221-
// anytimes here because this is quite a racy test and the actual assertions for the unsubscription logic will be separated out
222-
m.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("host2", "host2", nil), nil).AnyTimes()
223-
m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do(
224-
func(service string, name string, inc chan<- *membership.ChangedEvent) {
225-
m := membership.ChangedEvent{
226-
HostsAdded: nil,
227-
HostsUpdated: nil,
228-
HostsRemoved: []string{"host123"},
229-
}
230-
inc <- &m
231-
})
197+
mockResolver.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("host2", "host2", nil), nil).AnyTimes()
198+
mockResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any())
232199
mockDomainCache.EXPECT().UnregisterDomainChangeCallback(service.Matching).Times(1)
233200

234-
go func() {
235-
// then call stop so the test can finish
236-
time.Sleep(time.Second)
237-
e.Stop()
238-
}()
201+
go engine.runMembershipChangeLoop()
239202

240-
e.subscribeToMembershipChanges()
203+
engine.Stop()
204+
assert.True(t, common.AwaitWaitGroup(&shutdownWG, 10*time.Second), "runMembershipChangeLoop has to be shut down")
241205
}
242206

243207
func TestSubscriptionAndErrorReturned(t *testing.T) {
244208
ctrl := gomock.NewController(t)
245-
m := membership.NewMockResolver(ctrl)
209+
mockResolver := membership.NewMockResolver(ctrl)
246210

247211
mockDomainCache := cache.NewMockDomainCache(ctrl)
248212

249213
shutdownWG := sync.WaitGroup{}
250214
shutdownWG.Add(1)
251215

252-
e := matchingEngineImpl{
216+
membershipChangeHandledWG := sync.WaitGroup{}
217+
membershipChangeHandledWG.Add(1)
218+
219+
engine := matchingEngineImpl{
253220
shutdownCompletion: &shutdownWG,
254-
membershipResolver: m,
221+
membershipResolver: mockResolver,
255222
config: &config.Config{
256223
EnableTasklistOwnershipGuard: func(opts ...dynamicproperties.FilterOption) bool { return true },
257224
},
@@ -261,9 +228,13 @@ func TestSubscriptionAndErrorReturned(t *testing.T) {
261228
}
262229

263230
// this should trigger the error case on a membership event
264-
m.EXPECT().WhoAmI().Return(membership.HostInfo{}, assert.AnError).AnyTimes()
231+
// unfortunately, this is purely for code-coverage, no checks are involved
232+
mockResolver.EXPECT().WhoAmI().DoAndReturn(func() (membership.HostInfo, error) {
233+
membershipChangeHandledWG.Done()
234+
return membership.HostInfo{}, errors.New("failure")
235+
}).MinTimes(1)
265236

266-
m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do(
237+
mockResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do(
267238
func(service string, name string, inc chan<- *membership.ChangedEvent) {
268239
m := membership.ChangedEvent{
269240
HostsAdded: nil,
@@ -275,29 +246,33 @@ func TestSubscriptionAndErrorReturned(t *testing.T) {
275246

276247
mockDomainCache.EXPECT().UnregisterDomainChangeCallback(service.Matching).Times(1)
277248

278-
go func() {
279-
// then call stop so the test can finish
280-
time.Sleep(time.Second)
281-
e.Stop()
282-
}()
249+
go engine.runMembershipChangeLoop()
283250

284-
e.subscribeToMembershipChanges()
251+
assert.True(t,
252+
common.AwaitWaitGroup(&membershipChangeHandledWG, 10*time.Second),
253+
"membership event is not handled",
254+
)
255+
256+
engine.Stop()
257+
assert.True(t,
258+
common.AwaitWaitGroup(&shutdownWG, 10*time.Second),
259+
"runMembershipChangeLoop has to be shut down",
260+
)
285261
}
286262

287263
func TestSubscribeToMembershipChangesQuitsIfSubscribeFails(t *testing.T) {
288264
ctrl := gomock.NewController(t)
289-
m := membership.NewMockResolver(ctrl)
290-
265+
mockResolver := membership.NewMockResolver(ctrl)
291266
mockDomainCache := cache.NewMockDomainCache(ctrl)
292267

293268
logger, logs := testlogger.NewObserved(t)
294269

295270
shutdownWG := sync.WaitGroup{}
296271
shutdownWG.Add(1)
297272

298-
e := matchingEngineImpl{
273+
engine := matchingEngineImpl{
299274
shutdownCompletion: &shutdownWG,
300-
membershipResolver: m,
275+
membershipResolver: mockResolver,
301276
config: &config.Config{
302277
EnableTasklistOwnershipGuard: func(opts ...dynamicproperties.FilterOption) bool { return true },
303278
},
@@ -306,49 +281,42 @@ func TestSubscribeToMembershipChangesQuitsIfSubscribeFails(t *testing.T) {
306281
domainCache: mockDomainCache,
307282
}
308283

309-
// this should trigger the error case on a membership event
310-
m.EXPECT().WhoAmI().Return(membership.HostInfo{}, assert.AnError).AnyTimes()
311-
312-
m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).
313-
Return(errors.New("matching-engine is already subscribed to updates"))
284+
mockResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).
285+
Return(errors.New("failed to subscribe"))
314286

315287
mockDomainCache.EXPECT().UnregisterDomainChangeCallback(service.Matching).AnyTimes()
316288

317-
go func() {
318-
// then call stop so the test can finish
319-
time.Sleep(time.Second)
320-
e.Stop()
321-
}()
322-
323-
e.subscribeToMembershipChanges()
324-
// check we emitted error-message
325-
filteredLogs := logs.FilterMessage("Failed to subscribe to membership updates")
326-
assert.Equal(t, 1, filteredLogs.Len(), "error-message should be produced")
289+
go engine.runMembershipChangeLoop()
290+
// we do not stop `engine` here - it has to shut down after failing to Subscribe
327291

328292
assert.True(
329293
t,
330294
common.AwaitWaitGroup(&shutdownWG, 10*time.Second),
331-
"subscribeToMembershipChanges should immediately shut down because of critical error",
295+
"runMembershipChangeLoop should immediately shut down because of critical error",
332296
)
297+
298+
// check we emitted error-message
299+
filteredLogs := logs.FilterMessage("Failed to subscribe to membership updates")
300+
assert.Equal(t, 1, filteredLogs.Len(), "error-message should be produced")
333301
}
334302

335303
func TestGetTasklistManagerShutdownScenario(t *testing.T) {
336304
ctrl := gomock.NewController(t)
337-
m := membership.NewMockResolver(ctrl)
305+
mockResolver := membership.NewMockResolver(ctrl)
338306

339307
mockDomainCache := cache.NewMockDomainCache(ctrl)
340308

341309
self := membership.NewDetailedHostInfo("self", "self", nil)
342310

343-
m.EXPECT().WhoAmI().Return(self, nil).AnyTimes()
311+
mockResolver.EXPECT().WhoAmI().Return(self, nil).AnyTimes()
344312
mockDomainCache.EXPECT().UnregisterDomainChangeCallback(service.Matching).Times(1)
345313

346314
shutdownWG := sync.WaitGroup{}
347315
shutdownWG.Add(0)
348316

349-
e := matchingEngineImpl{
317+
engine := matchingEngineImpl{
350318
shutdownCompletion: &shutdownWG,
351-
membershipResolver: m,
319+
membershipResolver: mockResolver,
352320
config: &config.Config{
353321
EnableTasklistOwnershipGuard: func(opts ...dynamicproperties.FilterOption) bool { return true },
354322
},
@@ -357,11 +325,11 @@ func TestGetTasklistManagerShutdownScenario(t *testing.T) {
357325
domainCache: mockDomainCache,
358326
}
359327

360-
// set this engine to be shutting down so as to trigger the tasklistGetTasklistByID guard
361-
e.Stop()
328+
// set this engine to be shutting down to trigger the tasklistGetTasklistByID guard
329+
engine.Stop()
362330

363331
tl, _ := tasklist.NewIdentifier("domainid", "tl", 0)
364-
res, err := e.getTaskListManager(tl, types.TaskListKindNormal)
332+
res, err := engine.getTaskListManager(tl, types.TaskListKindNormal)
365333
assertErr := &cadence_errors.TaskListNotOwnedByHostError{}
366334
assert.ErrorAs(t, err, &assertErr)
367335
assert.Nil(t, res)

0 commit comments

Comments
 (0)