Skip to content

Commit 2b59b72

Browse files
authored
Fix executor coordinator graceful shutdown. (#747)
* Add test. * Graceful shutdown for executor coordinator.
1 parent d88c4d1 commit 2b59b72

File tree

2 files changed

+96
-12
lines changed

2 files changed

+96
-12
lines changed

executor/executor_coordinator.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,10 @@ func (ec *Coordinator) Start(_ context.Context) error {
9494
})
9595

9696
// Start worker goroutines
97-
ec.wg.Add(ec.workerCount)
9897
for i := 0; i < ec.workerCount; i++ {
99-
go func() {
100-
defer ec.wg.Done()
98+
ec.wg.Go(func() {
10199
ec.handleMessage(c)
102-
}()
100+
})
103101
}
104102

105103
ec.lggr.Infow("Coordinator started")
@@ -116,12 +114,12 @@ func (ec *Coordinator) Close() error {
116114
ec.cancel()
117115
}
118116

119-
// Close channel to signal workers to stop
120-
close(ec.workerPoolTasks)
121-
122117
// Wait for all goroutines to finish
123118
ec.wg.Wait()
124119

120+
// It is safe to close the channel once all goroutines have stopped.
121+
close(ec.workerPoolTasks)
122+
125123
// Update running state to reflect in healthcheck and readiness
126124
ec.running.Store(false)
127125

@@ -204,6 +202,7 @@ func (ec *Coordinator) runProcessingLoop(ctx context.Context) {
204202
for {
205203
select {
206204
case <-ctx.Done():
205+
ec.lggr.Infow("Processing loop exiting")
207206
return
208207
case <-ticker.C:
209208
currentTime := ec.timeProvider.GetTime()
@@ -217,8 +216,13 @@ func (ec *Coordinator) runProcessingLoop(ctx context.Context) {
217216
)
218217
for _, payload := range readyMessages {
219218
ec.inFlightAdd(payload.MessageID)
220-
// If the channel is full, we will block here, but messages will continue to be accumulate in the heap.
221-
ec.workerPoolTasks <- payload
219+
// If the channel is full, we will block here, but messages will continue to accumulate in the heap.
220+
select {
221+
case ec.workerPoolTasks <- payload:
222+
case <-ctx.Done():
223+
ec.lggr.Infow("Processing loop dropping payload to exit")
224+
continue
225+
}
222226
}
223227
case <-reportingTicker.C:
224228
ec.monitoring.Metrics().RecordMessageHeapSize(ctx, int64(ec.delayedMessageHeap.Len()))
@@ -230,12 +234,12 @@ func (ec *Coordinator) handleMessage(ctx context.Context) {
230234
for {
231235
select {
232236
case <-ctx.Done():
237+
ec.lggr.Infow("Message handler exiting")
233238
return
234239
case payload, ok := <-ec.workerPoolTasks:
235-
if !ok {
236-
return
240+
if ok {
241+
ec.processPayload(ctx, payload)
237242
}
238-
ec.processPayload(ctx, payload)
239243
}
240244
}
241245
}

executor/executor_coordinator_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,3 +559,83 @@ func TestDuplicateMessageIDFromStreamWhenAlreadyInHeap_IsSkippedByHeapAndHandleM
559559
time.Sleep(2 * time.Second)
560560
require.True(t, mock.AssertExpectationsForObjects(t, mockExecutor))
561561
}
562+
563+
// TestGracefulShutdown tests that when Close() is called while a message is being processed, the processing loop will
564+
// shut down gracefully. This state is simulated by blocking the HandleMessage() call until Close() is called, and then
565+
// asserting that we logged the message about dropping a payload to exit.
566+
func TestGracefulShutdown(t *testing.T) {
567+
lggr, hook := logger.TestObserved(t, zapcore.InfoLevel)
568+
currentTime := time.Now().UTC()
569+
mockTimeProvider := mocks.NewMockTimeProvider(t)
570+
mockTimeProvider.EXPECT().GetTime().Return(currentTime).Maybe()
571+
572+
seqNum := uint64(0)
573+
messageGenerator := func() common.MessageWithMetadata {
574+
seqNum++
575+
return common.MessageWithMetadata{
576+
Message: protocol.Message{
577+
DestChainSelector: 1,
578+
SourceChainSelector: 2,
579+
SequenceNumber: protocol.SequenceNumber(seqNum),
580+
},
581+
Metadata: common.MessageMetadata{
582+
IngestionTimestamp: currentTime,
583+
},
584+
}
585+
}
586+
587+
results := make(chan common.MessageWithMetadata, 1)
588+
results <- messageGenerator()
589+
messageSubscriber := mocks.NewMockMessageSubscriber(t)
590+
messageSubscriber.EXPECT().Start(mock.Anything).Return(results, nil, nil)
591+
592+
unblockHandle := make(chan struct{})
593+
mockExecutor := mocks.NewMockExecutor(t)
594+
mockExecutor.EXPECT().Start(mock.Anything).Return(nil)
595+
mockExecutor.EXPECT().CheckValidMessage(mock.Anything, mock.Anything).Return(nil).Maybe()
596+
mockExecutor.EXPECT().HandleMessage(mock.Anything, mock.Anything).Run(func(context.Context, protocol.Message) {
597+
<-unblockHandle
598+
}).Return(false, nil).Maybe()
599+
600+
leaderElector := mocks.NewMockLeaderElector(t)
601+
leaderElector.EXPECT().GetReadyTimestamp(mock.Anything, mock.Anything, mock.Anything).Return(currentTime).Maybe()
602+
leaderElector.EXPECT().GetRetryDelay(mock.Anything).Return(time.Second).Maybe()
603+
604+
ec, err := executor.NewCoordinator(
605+
lggr,
606+
mockExecutor,
607+
messageSubscriber,
608+
leaderElector,
609+
monitoring.NewNoopExecutorMonitoring(),
610+
8*time.Hour,
611+
mockTimeProvider,
612+
1,
613+
)
614+
require.NoError(t, err)
615+
require.NotNil(t, ec)
616+
617+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
618+
defer cancel()
619+
620+
require.NoError(t, ec.Start(ctx))
621+
time.Sleep(1 * time.Second)
622+
623+
// Block close for 1 second to ensure the processing loop is forced to drop a payload.
624+
go func() {
625+
time.Sleep(1 * time.Second)
626+
close(unblockHandle)
627+
}()
628+
require.NoError(t, ec.Close())
629+
630+
// Assert that we logged the message about dropping a payload.
631+
found := func() bool {
632+
for _, entry := range hook.All() {
633+
entryStr := fmt.Sprintf("%+v", entry)
634+
if strings.Contains(entryStr, "Processing loop dropping payload to exit") {
635+
return true
636+
}
637+
}
638+
return false
639+
}
640+
require.Eventuallyf(t, found, 2*time.Second, 100*time.Millisecond, "executor coordinator did not stop in time")
641+
}

0 commit comments

Comments
 (0)