Skip to content

Commit b7d1b2a

Browse files
authored
Fix a PANIC in coordinator. (pingcap#1107)
close pingcap#1140
1 parent 416281d commit b7d1b2a

File tree

24 files changed

+242
-43
lines changed

24 files changed

+242
-43
lines changed

api/v2/coordinator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
func (h *OpenAPIV2) ResignOwner(c *gin.Context) {
3030
o, _ := h.server.GetCoordinator()
3131
if o != nil {
32-
o.AsyncStop()
32+
o.Stop()
3333
}
3434

3535
c.JSON(getStatus(c), &EmptyResponse{})

cmd/cdc/server/server.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,16 @@ func (o *options) run(cmd *cobra.Command) error {
112112
log.Info("TiCDC(new arch) server created",
113113
zap.Strings("pd", o.pdEndpoints), zap.Stringer("config", o.serverConfig))
114114

115+
// shutdown is used to notify the server to shutdown (by closing the context) when receiving the signal, and exit the process.
116+
shutdown := func() <-chan struct{} {
117+
done := make(chan struct{})
118+
cancel()
119+
close(done)
120+
return done
121+
}
122+
// Gracefully shutdown the server when receiving the signal, and exit the process.
123+
util.InitSignalHandling(shutdown, cancel)
124+
115125
err = svr.Run(ctx)
116126
if err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
117127
log.Warn("cdc server exits with error", zap.Error(err))

coordinator/coordinator.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ func New(node *node.Info,
123123
mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages)
124124

125125
c.taskScheduler = threadpool.NewThreadPoolDefault()
126-
c.closed.Store(false)
127126

128127
controller := NewController(
129128
c.version,
@@ -147,18 +146,41 @@ func New(node *node.Info,
147146
log.Info("Coordinator changed, and I am not the coordinator, stop myself",
148147
zap.String("selfID", string(c.nodeInfo.ID)),
149148
zap.String("newCoordinatorID", newCoordinatorID))
150-
c.AsyncStop()
149+
c.Stop()
151150
}
152151
})
153152

154153
return c
155154
}
156155

157-
func (c *coordinator) recvMessages(_ context.Context, msg *messaging.TargetMessage) error {
156+
func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error {
158157
if c.closed.Load() {
159158
return nil
160159
}
161-
c.eventCh.In() <- &Event{message: msg}
160+
161+
defer func() {
162+
if r := recover(); r != nil {
163+
// There is chance that:
164+
// 1. Just before the c.eventCh is closed, the recvMessages is called
165+
// 2. Then the goroutine(call it g1) that calls recvMessages is scheduled out by runtime, and the msg is in flight
166+
// 3. The c.eventCh is closed by another goroutine(call it g2)
167+
// 4. g1 is scheduled back by runtime, and the msg is sent to the closed channel
168+
// 5. g1 panics
169+
// To avoid the panic, we have two choices:
170+
// 1. Use a mutex to protect this function, but it will reduce the throughput
171+
// 2. Recover the panic, and log the error
172+
// We choose the second option here.
173+
log.Error("panic in recvMessages", zap.Any("msg", msg), zap.Any("panic", r))
174+
}
175+
}()
176+
177+
select {
178+
case <-ctx.Done():
179+
return ctx.Err()
180+
default:
181+
c.eventCh.In() <- &Event{message: msg}
182+
}
183+
162184
return nil
163185
}
164186

@@ -385,13 +407,14 @@ func (c *coordinator) GetChangefeed(ctx context.Context, changefeedDisplayName c
385407
return c.controller.GetChangefeed(ctx, changefeedDisplayName)
386408
}
387409

388-
func (c *coordinator) AsyncStop() {
410+
func (c *coordinator) Stop() {
389411
if c.closed.CompareAndSwap(false, true) {
390412
c.mc.DeRegisterHandler(messaging.CoordinatorTopic)
391413
c.controller.Stop()
392414
c.taskScheduler.Stop()
393-
c.eventCh.CloseAndDrain()
394415
c.cancel()
416+
// close eventCh after cancel, to avoid send or get event from the channel
417+
c.eventCh.CloseAndDrain()
395418
}
396419
}
397420

coordinator/coordinator_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net/http"
2323
"net/http/pprof"
2424
"strconv"
25+
"sync"
2526
"testing"
2627
"time"
2728

@@ -477,6 +478,132 @@ func TestBootstrapWithUnStoppedChangefeed(t *testing.T) {
477478
}, waitTime, time.Millisecond*5)
478479
}
479480

481+
func TestConcurrentStopAndSendEvents(t *testing.T) {
482+
// Setup context
483+
ctx, cancel := context.WithCancel(context.Background())
484+
defer cancel()
485+
486+
// Initialize node info
487+
info := node.NewInfo("127.0.0.1:28600", "")
488+
etcdClient := newMockEtcdClient(string(info.ID))
489+
nodeManager := watcher.NewNodeManager(nil, etcdClient)
490+
appcontext.SetService(watcher.NodeManagerName, nodeManager)
491+
nodeManager.GetAliveNodes()[info.ID] = info
492+
493+
// Initialize message center
494+
mc := messaging.NewMessageCenter(ctx, info.ID, 0, config.NewDefaultMessageCenterConfig(), nil)
495+
mc.Run(ctx)
496+
defer mc.Close()
497+
appcontext.SetService(appcontext.MessageCenter, mc)
498+
499+
// Initialize backend
500+
ctrl := gomock.NewController(t)
501+
defer ctrl.Finish()
502+
backend := mock_changefeed.NewMockBackend(ctrl)
503+
backend.EXPECT().GetAllChangefeeds(gomock.Any()).Return(map[common.ChangeFeedID]*changefeed.ChangefeedMetaWrapper{}, nil).AnyTimes()
504+
505+
// Create coordinator
506+
cr := New(info, &mockPdClient{}, pdutil.NewClock4Test(), backend, "test-gc-service", 100, 10000, time.Millisecond*10)
507+
co := cr.(*coordinator)
508+
509+
// Number of goroutines for each operation
510+
const (
511+
sendEventGoroutines = 10
512+
stopGoroutines = 5
513+
eventsPerGoroutine = 100
514+
)
515+
516+
var wg sync.WaitGroup
517+
wg.Add(sendEventGoroutines + stopGoroutines)
518+
519+
// Start the coordinator
520+
ctxRun, cancelRun := context.WithCancel(ctx)
521+
go func() {
522+
err := cr.Run(ctxRun)
523+
if err != nil && err != context.Canceled {
524+
t.Errorf("Coordinator Run returned unexpected error: %v", err)
525+
}
526+
}()
527+
528+
// Give coordinator some time to initialize
529+
time.Sleep(100 * time.Millisecond)
530+
531+
// Start goroutines to send events
532+
for i := 0; i < sendEventGoroutines; i++ {
533+
go func(id int) {
534+
defer wg.Done()
535+
defer func() {
536+
// Recover from potential panics
537+
if r := recover(); r != nil {
538+
t.Errorf("Panic in send event goroutine %d: %v", id, r)
539+
}
540+
}()
541+
542+
for j := 0; j < eventsPerGoroutine; j++ {
543+
// Try to send an event
544+
if co.closed.Load() {
545+
// Coordinator is already closed, stop sending
546+
return
547+
}
548+
549+
msg := &messaging.TargetMessage{
550+
Topic: messaging.CoordinatorTopic,
551+
Type: messaging.TypeMaintainerHeartbeatRequest,
552+
}
553+
554+
// Use recvMessages to send event to channel
555+
err := co.recvMessages(ctx, msg)
556+
if err != nil && err != context.Canceled {
557+
t.Logf("Failed to send event in goroutine %d: %v", id, err)
558+
}
559+
560+
// Small sleep to increase chance of race conditions
561+
time.Sleep(time.Millisecond)
562+
}
563+
}(i)
564+
}
565+
566+
// Start goroutines to stop the coordinator
567+
for i := 0; i < stopGoroutines; i++ {
568+
go func(id int) {
569+
defer wg.Done()
570+
// Small delay to ensure some events are sent first
571+
time.Sleep(time.Duration(10+id*5) * time.Millisecond)
572+
co.Stop()
573+
}(i)
574+
}
575+
576+
// Wait for all goroutines to complete
577+
wg.Wait()
578+
579+
// Cancel the context to ensure the coordinator stops
580+
cancelRun()
581+
582+
// Give some time for the coordinator to fully stop
583+
time.Sleep(100 * time.Millisecond)
584+
585+
// Verify that the coordinator is closed
586+
require.True(t, co.closed.Load())
587+
588+
// Verify that event channel is closed
589+
select {
590+
case _, ok := <-co.eventCh.Out():
591+
require.False(t, ok, "Event channel should be closed")
592+
default:
593+
// Channel might be already drained, which is fine
594+
}
595+
596+
// Try sending another event - should not panic but may return error
597+
msg := &messaging.TargetMessage{
598+
Topic: messaging.CoordinatorTopic,
599+
Type: messaging.TypeMaintainerHeartbeatRequest,
600+
}
601+
602+
err := co.recvMessages(ctx, msg)
603+
require.NoError(t, err)
604+
require.True(t, co.closed.Load())
605+
}
606+
480607
type maintainNode struct {
481608
cancel context.CancelFunc
482609
mc messaging.MessageCenter

downstreamadapter/dispatchermanager/heartbeat_collector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
212212
}
213213

214214
func (c *HeartBeatCollector) Close() {
215+
log.Info("heartbeat collector is closing")
215216
c.mc.DeRegisterHandler(messaging.HeartbeatCollectorTopic)
216217
c.cancel()
217218
c.wg.Wait()

downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,9 @@ func (m *DispatcherOrchestrator) sendResponse(to node.ID, topic string, msg mess
273273
}
274274

275275
func (m *DispatcherOrchestrator) Close() {
276+
log.Info("dispatcher orchestrator is closing")
276277
m.mc.DeRegisterHandler(messaging.DispatcherManagerManagerTopic)
278+
log.Info("dispatcher orchestrator closed")
277279
}
278280

279281
// handleDispatcherError creates and sends an error response for create dispatcher-related failures

downstreamadapter/eventcollector/event_collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,9 @@ func (c *EventCollector) Run(ctx context.Context) {
161161
}
162162

163163
func (c *EventCollector) Close() {
164+
log.Info("event collector is closing")
164165
c.cancel()
165166
c.ds.Close()
166-
167167
c.changefeedIDMap.Range(func(key, value any) bool {
168168
cfID := value.(common.ChangeFeedID)
169169
// Remove metrics for the changefeed.

pkg/messaging/message_center.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ func (mc *messageCenter) ReceiveCmd() (*TargetMessage, error) {
278278

279279
// Close stops the grpc server and stops all the connections to the remote targets.
280280
func (mc *messageCenter) Close() {
281+
log.Info("message center is closing", zap.Stringer("id", mc.id))
281282
mc.remoteTargets.RLock()
282283
defer mc.remoteTargets.RUnlock()
283284

pkg/messaging/proto/message.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ message Message {
1414
int32 type = 5;
1515
// topic is the destination of the message, it is used to route the message to the correct handler.
1616
string topic = 6;
17-
// TODO, change to real types
1817
repeated bytes payload = 7;
1918
}
2019

pkg/messaging/target.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,36 +116,36 @@ func (s *remoteMessageTarget) Epoch() uint64 {
116116
func (s *remoteMessageTarget) sendEvent(msg ...*TargetMessage) error {
117117
if !s.eventSender.ready.Load() {
118118
s.connectionNotfoundErrorCounter.Inc()
119-
return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has not been initialized"}
119+
return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has not been initialized, target: %s", s.targetId)}
120120
}
121121
select {
122122
case <-s.ctx.Done():
123123
s.connectionNotfoundErrorCounter.Inc()
124-
return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has been closed"}
124+
return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has been closed, target: %s", s.targetId)}
125125
case s.sendEventCh <- s.newMessage(msg...):
126126
s.sendEventCounter.Add(float64(len(msg)))
127127
return nil
128128
default:
129129
s.congestedEventErrorCounter.Inc()
130-
return AppError{Type: ErrorTypeMessageCongested, Reason: "Send event message is congested"}
130+
return AppError{Type: ErrorTypeMessageCongested, Reason: fmt.Sprintf("Send event message is congested, target: %s", s.targetId)}
131131
}
132132
}
133133

134134
func (s *remoteMessageTarget) sendCommand(msg ...*TargetMessage) error {
135135
if !s.commandSender.ready.Load() {
136136
s.connectionNotfoundErrorCounter.Inc()
137-
return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has not been initialized"}
137+
return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has not been initialized, target: %s", s.targetId)}
138138
}
139139
select {
140140
case <-s.ctx.Done():
141141
s.connectionNotfoundErrorCounter.Inc()
142-
return AppError{Type: ErrorTypeConnectionNotFound, Reason: "Stream has been closed"}
142+
return AppError{Type: ErrorTypeConnectionNotFound, Reason: fmt.Sprintf("Stream has been closed, target: %s", s.targetId)}
143143
case s.sendCmdCh <- s.newMessage(msg...):
144144
s.sendCmdCounter.Add(float64(len(msg)))
145145
return nil
146146
default:
147147
s.congestedCmdErrorCounter.Inc()
148-
return AppError{Type: ErrorTypeMessageCongested, Reason: "Send command message is congested"}
148+
return AppError{Type: ErrorTypeMessageCongested, Reason: fmt.Sprintf("Send command message is congested, target: %s", s.targetId)}
149149
}
150150
}
151151

0 commit comments

Comments
 (0)