@@ -81,11 +81,11 @@ type DispatcherManager struct {
8181
8282 // tableTriggerEventDispatcher is a special dispatcher, that is responsible for handling ddl and checkpoint events.
8383 tableTriggerEventDispatcher * dispatcher.EventDispatcher
84- // redoTableTriggerEventDispatcher is a special redo dispatcher, that is responsible for handling ddl and checkpoint events.
85- redoTableTriggerEventDispatcher * dispatcher.RedoDispatcher
84+ // tableTriggerRedoDispatcher is a special redo dispatcher, that is responsible for handling ddl and checkpoint events.
85+ tableTriggerRedoDispatcher * dispatcher.RedoDispatcher
8686 // dispatcherMap restore all the dispatchers in the DispatcherManager, including table trigger event dispatcher
8787 dispatcherMap * DispatcherMap [* dispatcher.EventDispatcher ]
88- // redoDispatcherMap restore all the redo dispatchers in the DispatcherManager, including redo table trigger event dispatcher
88+ // redoDispatcherMap restore all the redo dispatchers in the DispatcherManager, including table trigger redo dispatcher
8989 redoDispatcherMap * DispatcherMap [* dispatcher.RedoDispatcher ]
9090 // schemaIDToDispatchers is shared in the DispatcherManager,
9191 // it store all the infos about schemaID->Dispatchers
@@ -140,9 +140,9 @@ type DispatcherManager struct {
140140 metricResolvedTs prometheus.Gauge
141141 metricResolvedTsLag prometheus.Gauge
142142
143- metricRedoTableTriggerEventDispatcherCount prometheus.Gauge
144- metricRedoEventDispatcherCount prometheus.Gauge
145- metricRedoCreateDispatcherDuration prometheus.Observer
143+ metricTableTriggerRedoDispatcherCount prometheus.Gauge
144+ metricRedoEventDispatcherCount prometheus.Gauge
145+ metricRedoCreateDispatcherDuration prometheus.Observer
146146}
147147
148148// return actual startTs of the table trigger event dispatcher
@@ -152,11 +152,11 @@ func NewDispatcherManager(
152152 changefeedID common.ChangeFeedID ,
153153 cfConfig * config.ChangefeedConfig ,
154154 tableTriggerEventDispatcherID ,
155- redoTableTriggerEventDispatcherID * heartbeatpb.DispatcherID ,
155+ tableTriggerRedoDispatcherID * heartbeatpb.DispatcherID ,
156156 startTs uint64 ,
157157 maintainerID node.ID ,
158158 newChangefeed bool ,
159- ) (* DispatcherManager , uint64 , error ) {
159+ ) (* DispatcherManager , error ) {
160160 failpoint .Inject ("NewDispatcherManagerDelay" , nil )
161161
162162 ctx , cancel := context .WithCancel (context .Background ())
@@ -193,9 +193,9 @@ func NewDispatcherManager(
193193 metricResolvedTs : metrics .DispatcherManagerResolvedTsGauge .WithLabelValues (changefeedID .Keyspace (), changefeedID .Name ()),
194194 metricResolvedTsLag : metrics .DispatcherManagerResolvedTsLagGauge .WithLabelValues (changefeedID .Keyspace (), changefeedID .Name ()),
195195
196- metricRedoTableTriggerEventDispatcherCount : metrics .TableTriggerEventDispatcherGauge .WithLabelValues (changefeedID .Keyspace (), changefeedID .Name (), "redoDispatcher" ),
197- metricRedoEventDispatcherCount : metrics .EventDispatcherGauge .WithLabelValues (changefeedID .Keyspace (), changefeedID .Name (), "redoDispatcher" ),
198- metricRedoCreateDispatcherDuration : metrics .CreateDispatcherDuration .WithLabelValues (changefeedID .Keyspace (), changefeedID .Name (), "redoDispatcher" ),
196+ metricTableTriggerRedoDispatcherCount : metrics .TableTriggerEventDispatcherGauge .WithLabelValues (changefeedID .Keyspace (), changefeedID .Name (), "redoDispatcher" ),
197+ metricRedoEventDispatcherCount : metrics .EventDispatcherGauge .WithLabelValues (changefeedID .Keyspace (), changefeedID .Name (), "redoDispatcher" ),
198+ metricRedoCreateDispatcherDuration : metrics .CreateDispatcherDuration .WithLabelValues (changefeedID .Keyspace (), changefeedID .Name (), "redoDispatcher" ),
199199 }
200200
201201 // Set the epoch and maintainerID of the event dispatcher manager
@@ -215,7 +215,7 @@ func NewDispatcherManager(
215215 var err error
216216 manager .sink , err = sink .New (ctx , manager .config , manager .changefeedID )
217217 if err != nil {
218- return nil , 0 , errors .Trace (err )
218+ return nil , errors .Trace (err )
219219 }
220220
221221 // Determine outputRawChangeEvent based on sink type
@@ -247,20 +247,19 @@ func NewDispatcherManager(
247247 // which is responsible for communication with the maintainer.
248248 err = appcontext.GetService [* HeartBeatCollector ](appcontext .HeartbeatCollector ).RegisterDispatcherManager (manager )
249249 if err != nil {
250- return nil , 0 , errors .Trace (err )
250+ return nil , errors .Trace (err )
251251 }
252252
253- var tableTriggerStartTs uint64 = 0
254253 // init table trigger event dispatcher when tableTriggerEventDispatcherID is not nil
255254 if tableTriggerEventDispatcherID != nil {
256- tableTriggerStartTs , err = manager .NewTableTriggerEventDispatcher (tableTriggerEventDispatcherID , startTs , newChangefeed )
255+ err = manager .NewTableTriggerEventDispatcher (tableTriggerEventDispatcherID , startTs , newChangefeed )
257256 if err != nil {
258- return nil , 0 , errors .Trace (err )
257+ return nil , errors .Trace (err )
259258 }
260259 }
261- err = initRedoComponet (ctx , manager , changefeedID , redoTableTriggerEventDispatcherID , startTs , newChangefeed )
260+ err = initRedoComponet (ctx , manager , changefeedID , tableTriggerRedoDispatcherID , startTs , newChangefeed )
262261 if err != nil {
263- return nil , 0 , errors .Trace (err )
262+ return nil , errors .Trace (err )
264263 }
265264
266265 manager .wg .Add (1 )
@@ -295,17 +294,16 @@ func NewDispatcherManager(
295294 zap .Stringer ("changefeedID" , changefeedID ),
296295 zap .Stringer ("maintainerID" , maintainerID ),
297296 zap .Uint64 ("startTs" , startTs ),
298- zap .Uint64 ("tableTriggerStartTs" , tableTriggerStartTs ),
299297 zap .Uint64 ("sinkQuota" , manager .sinkQuota ),
300298 zap .Uint64 ("redoQuota" , manager .redoQuota ),
301299 zap .Bool ("redoEnable" , manager .RedoEnable ),
302300 zap .Bool ("outputRawChangeEvent" , manager .sharedInfo .IsOutputRawChangeEvent ()),
303301 zap .String ("filterConfig" , filterCfg .String ()),
304302 )
305- return manager , tableTriggerStartTs , nil
303+ return manager , nil
306304}
307305
308- func (e * DispatcherManager ) NewTableTriggerEventDispatcher (id * heartbeatpb.DispatcherID , startTs uint64 , newChangefeed bool ) ( uint64 , error ) {
306+ func (e * DispatcherManager ) NewTableTriggerEventDispatcher (id * heartbeatpb.DispatcherID , startTs uint64 , newChangefeed bool ) error {
309307 if e .GetTableTriggerEventDispatcher () != nil {
310308 log .Error ("table trigger event dispatcher existed!" )
311309 }
@@ -319,14 +317,14 @@ func (e *DispatcherManager) NewTableTriggerEventDispatcher(id *heartbeatpb.Dispa
319317 }
320318 err := e .newEventDispatchers (infos , newChangefeed )
321319 if err != nil {
322- return 0 , errors .Trace (err )
320+ return errors .Trace (err )
323321 }
324322 log .Info ("table trigger event dispatcher created" ,
325323 zap .Stringer ("changefeedID" , e .changefeedID ),
326324 zap .Stringer ("dispatcherID" , e .GetTableTriggerEventDispatcher ().GetId ()),
327325 zap .Uint64 ("startTs" , e .GetTableTriggerEventDispatcher ().GetStartTs ()),
328326 )
329- return e . GetTableTriggerEventDispatcher (). GetStartTs (), nil
327+ return nil
330328}
331329
332330func (e * DispatcherManager ) InitalizeTableTriggerEventDispatcher (schemaInfo []* heartbeatpb.SchemaInfo ) error {
@@ -766,7 +764,7 @@ func (e *DispatcherManager) aggregateDispatcherHeartbeats(needCompleteStatus boo
766764 // Fill the missing watermarks with the final aggregated values to avoid
767765 // reporting an uninitialized checkpoint.
768766 for _ , id := range redoDispatchersWithoutWatermark {
769- eventServiceDispatcherHeartbeat .Append (event .NewDispatcherProgress (id , message .Watermark .CheckpointTs ))
767+ eventServiceDispatcherHeartbeat .Append (event .NewDispatcherProgress (id , message .RedoWatermark .CheckpointTs ))
770768 }
771769 for _ , id := range eventDispatchersWithoutWatermark {
772770 eventServiceDispatcherHeartbeat .Append (event .NewDispatcherProgress (id , message .Watermark .CheckpointTs ))
0 commit comments