Skip to content

Commit 0ad12ea

Browse files
authored
eventcollector, eventservice: add congestion control mechanism (pingcap#1588)
close pingcap#1404
1 parent 8e4744d commit 0ad12ea

38 files changed

+2850
-1444
lines changed

downstreamadapter/dispatcher/basic_dispatcher.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
317317
for _, dispatcherEvent := range dispatcherEvents {
318318
log.Debug("dispatcher receive all event",
319319
zap.Stringer("dispatcher", d.id), zap.Bool("isRedo", isRedo),
320-
zap.String("eventType", commonEvent.TypeToString(dispatcherEvent.Event.GetType())),
321320
zap.Any("event", dispatcherEvent.Event))
322321
failpoint.Inject("HandleEventsSlowly", func() {
323322
lag := time.Duration(rand.Intn(5000)) * time.Millisecond

downstreamadapter/eventcollector/dispatcher_stat.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -616,13 +616,12 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent)
616616
}
617617

618618
func (d *dispatcherStat) setRemoteCandidates(nodes []string) {
619-
log.Info("set remote candidates",
620-
zap.Strings("nodes", nodes),
621-
zap.Stringer("dispatcherID", d.getDispatcherID()))
622619
if len(nodes) == 0 {
623620
return
624621
}
625622
if d.connState.trySetRemoteCandidates(nodes) {
623+
log.Info("set remote candidates", zap.Stringer("dispatcherID", d.getDispatcherID()),
624+
zap.Int64("tableID", d.target.GetTableSpan().TableID), zap.Strings("nodes", nodes))
626625
candidate := d.connState.getNextRemoteCandidate()
627626
d.registerTo(candidate)
628627
}

downstreamadapter/eventcollector/event_collector.go

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ func (c *EventCollector) Run(ctx context.Context) {
169169
return c.processDSFeedback(ctx)
170170
})
171171

172+
g.Go(func() error {
173+
return c.controlCongestion(ctx)
174+
})
175+
172176
g.Go(func() error {
173177
return c.sendDispatcherRequests(ctx)
174178
})
@@ -219,7 +223,9 @@ func (c *EventCollector) PrepareAddDispatcher(
219223
) {
220224
log.Info("add dispatcher", zap.Stringer("dispatcher", target.GetId()))
221225
defer func() {
222-
log.Info("add dispatcher done", zap.Stringer("dispatcher", target.GetId()), zap.Int("type", target.GetType()))
226+
log.Info("add dispatcher done",
227+
zap.Stringer("dispatcherID", target.GetId()), zap.Int64("tableID", target.GetTableSpan().GetTableID()),
228+
zap.Uint64("startTs", target.GetStartTs()), zap.Int("type", target.GetType()))
223229
}()
224230
metrics.EventCollectorRegisteredDispatcherCount.Inc()
225231

@@ -238,11 +244,12 @@ func (c *EventCollector) PrepareAddDispatcher(
238244

239245
// CommitAddDispatcher notify local event service that the dispatcher is ready to receive events.
240246
func (c *EventCollector) CommitAddDispatcher(target dispatcher.Dispatcher, startTs uint64) {
241-
log.Info("commit add dispatcher", zap.Stringer("dispatcher", target.GetId()), zap.Uint64("startTs", startTs))
247+
log.Info("commit add dispatcher", zap.Stringer("dispatcherID", target.GetId()),
248+
zap.Int64("tableID", target.GetTableSpan().GetTableID()), zap.Uint64("startTs", startTs))
242249
value, ok := c.dispatcherMap.Load(target.GetId())
243250
if !ok {
244251
log.Warn("dispatcher not found when commit add dispatcher",
245-
zap.Stringer("dispatcher", target.GetId()),
252+
zap.Stringer("dispatcherID", target.GetId()), zap.Int64("tableID", target.GetTableSpan().GetTableID()),
246253
zap.Uint64("startTs", startTs))
247254
return
248255
}
@@ -251,9 +258,10 @@ func (c *EventCollector) CommitAddDispatcher(target dispatcher.Dispatcher, start
251258
}
252259

253260
func (c *EventCollector) RemoveDispatcher(target dispatcher.Dispatcher) {
254-
log.Info("remove dispatcher", zap.Stringer("dispatcher", target.GetId()))
261+
log.Info("remove dispatcher", zap.Stringer("dispatcherID", target.GetId()))
255262
defer func() {
256-
log.Info("remove dispatcher done", zap.Stringer("dispatcher", target.GetId()))
263+
log.Info("remove dispatcher done", zap.Stringer("dispatcherID", target.GetId()),
264+
zap.Int64("tableID", target.GetTableSpan().GetTableID()))
257265
}()
258266
isRedo := dispatcher.IsRedoDispatcher(target)
259267
value, ok := c.dispatcherMap.Load(target.GetId())
@@ -512,6 +520,90 @@ func (c *EventCollector) runDispatchMessage(ctx context.Context, inCh <-chan *me
512520
}
513521
}
514522

523+
func (c *EventCollector) controlCongestion(ctx context.Context) error {
524+
ticker := time.NewTicker(time.Second)
525+
defer ticker.Stop()
526+
527+
for {
528+
select {
529+
case <-ctx.Done():
530+
return context.Cause(ctx)
531+
case <-ticker.C:
532+
messages := c.newCongestionControlMessages()
533+
for serverID, m := range messages {
534+
if len(m.GetAvailables()) != 0 {
535+
msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, m)
536+
if err := c.mc.SendCommand(msg); err != nil {
537+
log.Warn("send congestion control message failed", zap.Error(err))
538+
}
539+
}
540+
}
541+
}
542+
}
543+
}
544+
545+
func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.CongestionControl {
546+
// collect all changefeeds' available memory quota
547+
availables := make(map[common.ChangeFeedID]uint64)
548+
for _, quota := range c.ds.GetMetrics().MemoryControl.AreaMemoryMetrics {
549+
changefeedID, ok := c.changefeedIDMap.Load(quota.Area())
550+
if !ok {
551+
continue
552+
}
553+
availables[changefeedID.(common.ChangeFeedID)] = uint64(quota.AvailableMemory())
554+
}
555+
if len(availables) == 0 {
556+
return nil
557+
}
558+
559+
// calculate each changefeed's available memory quota for each node
560+
// by the proportion of the dispatcher on each node.
561+
// this is not accurate, we should also consider each node's workload distribution.
562+
proportions := make(map[common.ChangeFeedID]map[node.ID]uint64)
563+
c.dispatcherMap.Range(func(k, v interface{}) bool {
564+
stat := v.(*dispatcherStat)
565+
eventServiceID := stat.connState.getEventServiceID()
566+
if eventServiceID == "" {
567+
return true
568+
}
569+
570+
changefeedID := stat.target.GetChangefeedID()
571+
holder, ok := proportions[changefeedID]
572+
if !ok {
573+
holder = make(map[node.ID]uint64)
574+
proportions[changefeedID] = holder
575+
}
576+
holder[eventServiceID]++
577+
return true
578+
})
579+
580+
// group the available memory quota by nodeID
581+
result := make(map[node.ID]*event.CongestionControl)
582+
for changefeedID, total := range availables {
583+
proportion := proportions[changefeedID]
584+
var sum uint64
585+
for _, portion := range proportion {
586+
sum += portion
587+
}
588+
if sum == 0 {
589+
continue
590+
}
591+
592+
for nodeID, portion := range proportion {
593+
ratio := float64(portion) / float64(sum)
594+
quota := uint64(float64(total) * ratio)
595+
596+
m, ok := result[nodeID]
597+
if !ok {
598+
m = event.NewCongestionControl()
599+
result[nodeID] = m
600+
}
601+
m.AddAvailableMemory(changefeedID.ID(), quota)
602+
}
603+
}
604+
return result
605+
}
606+
515607
func (c *EventCollector) updateMetrics(ctx context.Context) error {
516608
ticker := time.NewTicker(5 * time.Second)
517609
defer ticker.Stop()

downstreamadapter/sink/kafka/sink.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,17 @@ func (s *sink) sendMessages(ctx context.Context) error {
362362
metricSendMessageDuration := metrics.WorkerSendMessageDuration.WithLabelValues(s.changefeedID.Namespace(), s.changefeedID.Name())
363363
defer metrics.WorkerSendMessageDuration.DeleteLabelValues(s.changefeedID.Namespace(), s.changefeedID.Name())
364364

365+
ticker := time.NewTicker(5 * time.Second)
366+
defer ticker.Stop()
367+
365368
var err error
366369
outCh := s.comp.encoderGroup.Output()
367370
for {
368371
select {
369372
case <-ctx.Done():
370373
return errors.Trace(ctx.Err())
374+
case <-ticker.C:
375+
s.dmlProducer.Heartbeat()
371376
case future, ok := <-outCh:
372377
if !ok {
373378
log.Info("kafka sink encoder's output channel closed",
@@ -457,6 +462,9 @@ func (s *sink) sendCheckpoint(ctx context.Context) error {
457462
metrics.CheckpointTsMessageCount.DeleteLabelValues(s.changefeedID.Namespace(), s.changefeedID.Name())
458463
}()
459464

465+
ticker := time.NewTicker(5 * time.Second)
466+
defer ticker.Stop()
467+
460468
var (
461469
msg *common.Message
462470
partitionNum int32
@@ -466,6 +474,8 @@ func (s *sink) sendCheckpoint(ctx context.Context) error {
466474
select {
467475
case <-ctx.Done():
468476
return errors.Trace(ctx.Err())
477+
case <-ticker.C:
478+
s.ddlProducer.Heartbeat()
469479
case ts, ok := <-s.checkpointChan:
470480
if !ok {
471481
log.Warn("kafka sink checkpoint channel closed",

downstreamadapter/sink/topicmanager/kafka_topic_manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ func (m *kafkaTopicManager) GetPartitionNum(
115115
}
116116

117117
func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
118+
ticker := time.NewTicker(5 * time.Second)
119+
defer ticker.Stop()
118120
for {
119121
select {
120122
case <-ctx.Done():
@@ -123,6 +125,8 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
123125
zap.String("changefeed", m.changefeedID.Name()),
124126
)
125127
return
128+
case <-ticker.C:
129+
m.admin.Heartbeat()
126130
case <-m.metaRefreshTicker.C:
127131
// We ignore the error here, because the error may be caused by the
128132
// network problem, and we can try to get the metadata next time.

0 commit comments

Comments
 (0)