Skip to content

Commit 0358c70

Browse files
committed
fix: extend CurioChainSched timeout, isolate ctx cancellation (#575)
1 parent 6b7dbde commit 0358c70

File tree

2 files changed

+248
-18
lines changed

2 files changed

+248
-18
lines changed

lib/chainsched/chain_sched.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ import (
1919

2020
var log = logging.Logger("curio/chainsched")
2121

22-
// Notification timeout for chain updates, if we don't get a notification within this time frame
23-
// then something must be wrong so we'll attempt to restart
24-
const notificationTimeout = 60 * time.Second
22+
// Default notification timeout for chain updates. Set to 5 minutes to accommodate the 30-second
23+
// block time and potential consecutive null rounds and other reasonable delays upstream.
24+
// If we don't get a notification within this time frame, then something may be wrong so we'll
25+
// attempt to restart just in case.
26+
const defaultNotificationTimeout = 5 * time.Minute
2527

2628
type NodeAPI interface {
2729
ChainHead(context.Context) (*types.TipSet, error)
@@ -34,11 +36,21 @@ type CurioChainSched struct {
3436
callbacks []UpdateFunc
3537
lk sync.RWMutex
3638
started bool
39+
40+
notificationTimeout time.Duration
3741
}
3842

3943
func New(api NodeAPI) *CurioChainSched {
4044
return &CurioChainSched{
41-
api: api,
45+
api: api,
46+
notificationTimeout: defaultNotificationTimeout,
47+
}
48+
}
49+
50+
func NewWithNotificationTimeout(api NodeAPI, timeout time.Duration) *CurioChainSched {
51+
return &CurioChainSched{
52+
api: api,
53+
notificationTimeout: timeout,
4254
}
4355
}
4456

@@ -61,23 +73,44 @@ func (s *CurioChainSched) Run(ctx context.Context) {
6173

6274
var (
6375
notificationCh <-chan []*api.HeadChange
76+
notificationCancel context.CancelFunc
6477
err error
6578
gotFirstNotification bool
6679
)
6780

68-
ticker := build.Clock.Ticker(notificationTimeout)
81+
ticker := build.Clock.Ticker(s.notificationTimeout)
6982
defer ticker.Stop()
7083
lastNotif := build.Clock.Now()
7184

85+
// Ensure we clean up any active subscription on exit
86+
defer func() {
87+
if notificationCancel != nil {
88+
notificationCancel()
89+
}
90+
}()
91+
7292
// not fine to panic after this point
7393
for ctx.Err() == nil {
7494
if notificationCh == nil {
75-
notificationCh, err = s.api.ChainNotify(ctx)
95+
// Cancel any existing subscription context
96+
if notificationCancel != nil {
97+
notificationCancel()
98+
}
99+
100+
// Create new context for this subscription
101+
newCtx, newCancel := context.WithCancel(ctx)
102+
103+
notificationCh, err = s.api.ChainNotify(newCtx)
76104
if err != nil {
105+
// Cancel the context we just created since we're not using it
106+
newCancel()
77107
log.Errorw("ChainNotify", "error", err)
78108
build.Clock.Sleep(10 * time.Second) // Retry after 10 second wait
79109
continue
80110
}
111+
112+
// Only update the cancel function if we succeeded
113+
notificationCancel = newCancel
81114
gotFirstNotification = false
82115
log.Info("restarting CurioChainSched with new notification channel")
83116
lastNotif = build.Clock.Now()
@@ -148,8 +181,8 @@ func (s *CurioChainSched) Run(ctx context.Context) {
148181
case <-ticker.C:
149182
since := build.Clock.Since(lastNotif)
150183
log.Debugf("CurioChainSched ticker: %s since last notification", since)
151-
if since > notificationTimeout {
152-
log.Warnf("no notifications received in %s, resubscribing to ChainNotify", notificationTimeout)
184+
if since > s.notificationTimeout {
185+
log.Warnf("no notifications received in %s, resubscribing to ChainNotify", s.notificationTimeout)
153186
notificationCh = nil
154187
}
155188
case <-ctx.Done():

lib/chainsched/chain_sched_test.go

Lines changed: 207 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ func makeMockTipSet(height uint64) *types.TipSet {
6464
func TestAddHandlerConcurrency(t *testing.T) {
6565
api := &mockNodeAPI{
6666
notifCh: make(chan []*api.HeadChange),
67-
head: makeMockTipSet(100),
6867
}
6968

7069
sched := New(api)
@@ -101,7 +100,6 @@ func TestAddHandlerConcurrency(t *testing.T) {
101100
func TestAddHandlerAfterStart(t *testing.T) {
102101
mockAPI := &mockNodeAPI{
103102
notifCh: make(chan []*api.HeadChange),
104-
head: makeMockTipSet(100),
105103
}
106104

107105
sched := New(mockAPI)
@@ -131,7 +129,6 @@ func TestNotificationChannelResubscription(t *testing.T) {
131129
notifCh := make(chan []*api.HeadChange)
132130
mockAPI := &mockNodeAPI{
133131
notifCh: notifCh,
134-
head: makeMockTipSet(100),
135132
}
136133

137134
// Test that closing the notification channel causes resubscription
@@ -185,7 +182,6 @@ func TestCallbackExecution(t *testing.T) {
185182
notifCh := make(chan []*api.HeadChange, 10)
186183
mockAPI := &mockNodeAPI{
187184
notifCh: notifCh,
188-
head: makeMockTipSet(100),
189185
}
190186

191187
sched := New(mockAPI)
@@ -253,7 +249,6 @@ func TestContextCancellation(t *testing.T) {
253249
notifCh := make(chan []*api.HeadChange)
254250
mockAPI := &mockNodeAPI{
255251
notifCh: notifCh,
256-
head: makeMockTipSet(100),
257252
}
258253

259254
sched := New(mockAPI)
@@ -285,29 +280,220 @@ func TestContextCancellation(t *testing.T) {
285280
}
286281
}
287282

283+
func TestSubscriptionContextCancellation(t *testing.T) {
284+
// This test verifies that when resubscribing, the old subscription
285+
// context is properly cancelled
286+
287+
testCtx, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
288+
defer testCancel()
289+
290+
var notifyCalls []context.Context
291+
var mu sync.Mutex
292+
293+
mockAPI := &mockNodeAPI{}
294+
295+
firstCh := make(chan []*api.HeadChange, 1)
296+
firstCallReady := make(chan struct{})
297+
secondCallReady := make(chan struct{})
298+
299+
// Custom ChainNotify that captures contexts
300+
wrappedAPI := &mockNodeAPIWithContext{
301+
mockNodeAPI: mockAPI,
302+
chainNotifyFunc: func(ctx context.Context) (<-chan []*api.HeadChange, error) {
303+
mu.Lock()
304+
notifyCalls = append(notifyCalls, ctx)
305+
callNum := len(notifyCalls)
306+
mu.Unlock()
307+
308+
if callNum == 1 {
309+
defer close(firstCallReady)
310+
// First call - return channel we control below
311+
return firstCh, nil
312+
}
313+
314+
if callNum == 2 {
315+
defer close(secondCallReady)
316+
}
317+
318+
// Subsequent calls - return a properly initialized channel
319+
ch := make(chan []*api.HeadChange, 1)
320+
ch <- []*api.HeadChange{{
321+
Type: store.HCCurrent,
322+
Val: makeMockTipSet(100),
323+
}}
324+
return ch, nil
325+
},
326+
}
327+
328+
sched := New(wrappedAPI)
329+
330+
ctx, cancel := context.WithCancel(testCtx)
331+
defer cancel()
332+
333+
go sched.Run(ctx)
334+
335+
// Wait for first ChainNotify call
336+
select {
337+
case <-firstCallReady:
338+
case <-testCtx.Done():
339+
t.Fatal("Timeout waiting for first ChainNotify call")
340+
}
341+
342+
// Send initial notification
343+
firstCh <- []*api.HeadChange{{
344+
Type: store.HCCurrent,
345+
Val: makeMockTipSet(100),
346+
}}
347+
348+
// Verify we have the first subscription
349+
mu.Lock()
350+
require.Len(t, notifyCalls, 1)
351+
firstCtx := notifyCalls[0]
352+
mu.Unlock()
353+
354+
// Close the channel to trigger resubscription
355+
close(firstCh)
356+
357+
// Wait for second ChainNotify call
358+
select {
359+
case <-secondCallReady:
360+
case <-testCtx.Done():
361+
t.Fatal("Timeout waiting for second ChainNotify call")
362+
}
363+
364+
// Should have multiple calls now
365+
mu.Lock()
366+
require.GreaterOrEqual(t, len(notifyCalls), 2)
367+
mu.Unlock()
368+
369+
// Verify first context was cancelled
370+
select {
371+
case <-firstCtx.Done():
372+
// Good, context was cancelled
373+
default:
374+
t.Fatal("First subscription context was not cancelled on resubscription")
375+
}
376+
}
377+
378+
type mockNodeAPIWithContext struct {
379+
*mockNodeAPI
380+
chainNotifyFunc func(context.Context) (<-chan []*api.HeadChange, error)
381+
}
382+
383+
func (m *mockNodeAPIWithContext) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
384+
return m.chainNotifyFunc(ctx)
385+
}
386+
387+
func TestTimeoutResubscription(t *testing.T) {
388+
// This test verifies that the scheduler will resubscribe after
389+
// not receiving notifications for the configured timeout period
390+
391+
testCtx, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
392+
defer testCancel()
393+
394+
firstCallCh := make(chan struct{})
395+
secondCallCh := make(chan struct{})
396+
mu := &sync.Mutex{}
397+
398+
// Create a channel that won't receive new notifications after initial
399+
notifyCh := make(chan []*api.HeadChange, 1)
400+
401+
// Use mockNodeAPIWithContext to have full control over ChainNotify
402+
var callCount int
403+
wrappedAPI := &mockNodeAPIWithContext{
404+
mockNodeAPI: &mockNodeAPI{},
405+
chainNotifyFunc: func(ctx context.Context) (<-chan []*api.HeadChange, error) {
406+
mu.Lock()
407+
callCount++
408+
count := callCount
409+
mu.Unlock()
410+
411+
if count == 1 {
412+
defer close(firstCallCh)
413+
} else if count == 2 {
414+
defer close(secondCallCh)
415+
}
416+
417+
// Always return the same channel for this test
418+
return notifyCh, nil
419+
},
420+
}
421+
422+
// Create scheduler with very short timeout for testing
423+
sched := NewWithNotificationTimeout(wrappedAPI, 200*time.Millisecond)
424+
425+
ctx, cancel := context.WithCancel(testCtx)
426+
defer cancel()
427+
428+
go sched.Run(ctx)
429+
430+
// Wait for first ChainNotify call
431+
select {
432+
case <-firstCallCh:
433+
case <-testCtx.Done():
434+
t.Fatal("Timeout waiting for first ChainNotify call")
435+
}
436+
437+
// Send initial notification
438+
notifyCh <- []*api.HeadChange{{
439+
Type: store.HCCurrent,
440+
Val: makeMockTipSet(100),
441+
}}
442+
443+
// Verify initial call count
444+
mu.Lock()
445+
require.Equal(t, 1, callCount)
446+
mu.Unlock()
447+
448+
// Wait for timeout to trigger resubscription
449+
// The scheduler has a 200ms timeout, so wait for the second call
450+
select {
451+
case <-secondCallCh:
452+
// Good, resubscription happened
453+
case <-testCtx.Done():
454+
t.Fatal("Timeout waiting for resubscription after notification timeout")
455+
}
456+
457+
// Verify we got the second call
458+
mu.Lock()
459+
finalCount := callCount
460+
mu.Unlock()
461+
require.GreaterOrEqual(t, finalCount, 2, "ChainNotify should have been called again after timeout")
462+
}
463+
288464
func TestMultipleChanges(t *testing.T) {
465+
testCtx, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
466+
defer testCancel()
467+
289468
notifCh := make(chan []*api.HeadChange, 10)
290469
mockAPI := &mockNodeAPI{
291470
notifCh: notifCh,
292-
head: makeMockTipSet(100),
293471
}
294472

295473
sched := New(mockAPI)
296474

297475
var callbackMu sync.Mutex
298476
var callCount int
299477
var lastApply *types.TipSet
478+
firstCallDone := make(chan struct{})
479+
secondCallDone := make(chan struct{})
300480

301481
err := sched.AddHandler(func(ctx context.Context, revert, apply *types.TipSet) error {
302482
callbackMu.Lock()
303483
defer callbackMu.Unlock()
304484
callCount++
305485
lastApply = apply
486+
487+
if callCount == 1 {
488+
close(firstCallDone)
489+
} else if callCount == 2 {
490+
close(secondCallDone)
491+
}
306492
return nil
307493
})
308494
require.NoError(t, err)
309495

310-
ctx, cancel := context.WithCancel(context.Background())
496+
ctx, cancel := context.WithCancel(testCtx)
311497
defer cancel()
312498

313499
go sched.Run(ctx)
@@ -320,6 +506,13 @@ func TestMultipleChanges(t *testing.T) {
320506
}
321507
notifCh <- []*api.HeadChange{initialChange}
322508

509+
// Wait for first callback
510+
select {
511+
case <-firstCallDone:
512+
case <-testCtx.Done():
513+
t.Fatal("Timeout waiting for first callback")
514+
}
515+
323516
// Send multiple changes in one notification
324517
ts1 := makeMockTipSet(101)
325518
ts2 := makeMockTipSet(102)
@@ -332,13 +525,17 @@ func TestMultipleChanges(t *testing.T) {
332525
}
333526
notifCh <- changes
334527

335-
// Wait for processing
336-
time.Sleep(200 * time.Millisecond)
528+
// Wait for second callback
529+
select {
530+
case <-secondCallDone:
531+
case <-testCtx.Done():
532+
t.Fatal("Timeout waiting for second callback")
533+
}
337534

338535
callbackMu.Lock()
536+
defer callbackMu.Unlock()
339537
// Should be called with the highest tipset
340538
require.Equal(t, ts3, lastApply)
341539
// Initial current + one call for the batch
342540
require.Equal(t, 2, callCount)
343-
callbackMu.Unlock()
344541
}

0 commit comments

Comments
 (0)