Skip to content

Commit fb07628

Browse files
committed
ddl: refine ddl log and add more useful message in log (#3223)
ref #3178
1 parent 19562fd commit fb07628

File tree

8 files changed

+205
-130
lines changed

8 files changed

+205
-130
lines changed

downstreamadapter/dispatcher/basic_dispatcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
554554
pendingEvent := d.blockEventStatus.getEvent()
555555
if pendingEvent == nil && action.CommitTs > d.GetResolvedTs() {
556556
// we have not received the block event, and the action is for the future event, so just ignore
557-
log.Info("pending event is nil, and the action's commit is larger than dispatchers resolvedTs",
557+
log.Debug("pending event is nil, and the action's commit is larger than dispatchers resolvedTs",
558558
zap.Uint64("resolvedTs", d.GetResolvedTs()),
559559
zap.Uint64("actionCommitTs", action.CommitTs),
560560
zap.Stringer("dispatcher", d.id))
@@ -592,7 +592,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
592592
} else {
593593
ts, ok := d.blockEventStatus.getEventCommitTs()
594594
if ok && action.CommitTs > ts {
595-
log.Info("pending event's commitTs is smaller than the action's commitTs, just ignore it",
595+
log.Debug("pending event's commitTs is smaller than the action's commitTs, just ignore it",
596596
zap.Uint64("pendingEventCommitTs", ts),
597597
zap.Uint64("actionCommitTs", action.CommitTs),
598598
zap.Stringer("dispatcher", d.id))

downstreamadapter/dispatcher/helper.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,21 +261,23 @@ type ResendTask struct {
261261
taskHandle *threadpool.TaskHandle
262262
}
263263

264+
const resendTimeInterval = 10 * time.Second
265+
264266
func newResendTask(message *heartbeatpb.TableSpanBlockStatus, dispatcher Dispatcher, callback func()) *ResendTask {
265267
taskScheduler := GetDispatcherTaskScheduler()
266268
t := &ResendTask{
267269
message: message,
268270
dispatcher: dispatcher,
269271
callback: callback,
270272
}
271-
t.taskHandle = taskScheduler.Submit(t, time.Now().Add(50*time.Millisecond))
273+
t.taskHandle = taskScheduler.Submit(t, time.Now().Add(resendTimeInterval))
272274
return t
273275
}
274276

275277
func (t *ResendTask) Execute() time.Time {
276278
log.Debug("resend task", zap.Any("message", t.message), zap.Any("dispatcherID", t.dispatcher.GetId()))
277279
t.dispatcher.GetBlockStatusesChan() <- t.message
278-
return time.Now().Add(10 * time.Second)
280+
return time.Now().Add(resendTimeInterval)
279281
}
280282

281283
func (t *ResendTask) Cancel() {

maintainer/barrier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ func (b *Barrier) handleEventDone(changefeedID common.ChangeFeedID, dispatcherID
347347
key := getEventKey(status.State.BlockTs, status.State.IsSyncPoint)
348348
event, ok := b.blockedEvents.Get(key)
349349
if !ok {
350-
log.Info("No block event found, ignore the event done message",
350+
log.Debug("No block event found, ignore the event done message",
351351
zap.String("changefeed", changefeedID.Name()),
352352
zap.String("dispatcher", dispatcherID.String()),
353353
zap.Uint64("commitTs", status.State.BlockTs),
@@ -392,7 +392,7 @@ func (b *Barrier) handleBlockState(changefeedID common.ChangeFeedID,
392392
}
393393
if event.selected.Load() {
394394
// the event already in the selected state, ignore the block event just sent ack
395-
log.Warn("the block event already selected, ignore the block event",
395+
log.Debug("the block event already selected, ignore the block event",
396396
zap.String("changefeed", changefeedID.Name()),
397397
zap.String("dispatcher", dispatcherID.String()),
398398
zap.Uint64("commitTs", blockState.BlockTs),

maintainer/barrier_event.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func NewBlockEvent(cfID common.ChangeFeedID,
106106
if dynamicSplitEnabled {
107107
event.rangeChecker = range_checker.NewTableSpanRangeChecker(spanController.GetkeyspaceID(), status.BlockTables.TableIDs)
108108
} else {
109-
event.rangeChecker = range_checker.NewTableCountChecker(len(status.BlockTables.TableIDs))
109+
event.rangeChecker = range_checker.NewTableCountChecker(status.BlockTables.TableIDs)
110110
}
111111
}
112112
}
@@ -120,33 +120,31 @@ func NewBlockEvent(cfID common.ChangeFeedID,
120120
}
121121

122122
func (be *BarrierEvent) createRangeCheckerForTypeAll() {
123+
reps := be.spanController.GetAllTasks()
124+
tbls := make([]int64, 0, len(reps))
125+
for _, rep := range reps {
126+
tbls = append(tbls, rep.Span.TableID)
127+
}
123128
if be.dynamicSplitEnabled {
124-
reps := be.spanController.GetAllTasks()
125-
tbls := make([]int64, 0, len(reps))
126-
for _, rep := range reps {
127-
tbls = append(tbls, rep.Span.TableID)
128-
}
129-
tbls = append(tbls, common.DDLSpanTableID)
130129
be.rangeChecker = range_checker.NewTableSpanRangeChecker(be.spanController.GetkeyspaceID(), tbls)
131130
} else {
132-
be.rangeChecker = range_checker.NewTableCountChecker(be.spanController.TaskSize())
131+
be.rangeChecker = range_checker.NewTableCountChecker(tbls)
133132
}
134133
log.Info("create range checker for block event", zap.Any("influcenceType", be.blockedDispatchers.InfluenceType), zap.Any("commitTs", be.commitTs))
135134
}
136135

137136
func (be *BarrierEvent) createRangeCheckerForTypeDB() {
138-
if be.dynamicSplitEnabled {
139-
reps := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID)
140-
tbls := make([]int64, 0, len(reps))
141-
for _, rep := range reps {
142-
tbls = append(tbls, rep.Span.TableID)
143-
}
137+
reps := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID)
138+
tbls := make([]int64, 0, len(reps))
139+
for _, rep := range reps {
140+
tbls = append(tbls, rep.Span.TableID)
141+
}
144142

145-
tbls = append(tbls, common.DDLSpanTableID)
143+
tbls = append(tbls, common.DDLSpanTableID)
144+
if be.dynamicSplitEnabled {
146145
be.rangeChecker = range_checker.NewTableSpanRangeChecker(be.spanController.GetkeyspaceID(), tbls)
147146
} else {
148-
be.rangeChecker = range_checker.NewTableCountChecker(
149-
be.spanController.GetTaskSizeBySchemaID(be.blockedDispatchers.SchemaID) + 1 /*table trigger event dispatcher*/)
147+
be.rangeChecker = range_checker.NewTableCountChecker(tbls)
150148
}
151149
log.Info("create range checker for block event", zap.Any("influcenceType", be.blockedDispatchers.InfluenceType), zap.Any("commitTs", be.commitTs))
152150
}
@@ -363,7 +361,7 @@ func (be *BarrierEvent) allDispatcherReported() bool {
363361
if be.dynamicSplitEnabled {
364362
be.rangeChecker = range_checker.NewTableSpanRangeChecker(be.spanController.GetkeyspaceID(), be.blockedDispatchers.TableIDs)
365363
} else {
366-
be.rangeChecker = range_checker.NewTableCountChecker(len(be.blockedDispatchers.TableIDs))
364+
be.rangeChecker = range_checker.NewTableCountChecker(be.blockedDispatchers.TableIDs)
367365
}
368366
case heartbeatpb.InfluenceType_DB:
369367
be.createRangeCheckerForTypeDB()
@@ -515,7 +513,7 @@ func (be *BarrierEvent) checkBlockedDispatchers() {
515513
}
516514

517515
func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage {
518-
if time.Since(be.lastResendTime) < 10*time.Second {
516+
if time.Since(be.lastResendTime) < time.Second {
519517
return nil
520518
}
521519
var msgs []*messaging.TargetMessage

maintainer/barrier_event_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,23 @@
1313

1414
package maintainer
1515

16-
/*
16+
import (
17+
"testing"
18+
"time"
19+
20+
"github.com/pingcap/ticdc/heartbeatpb"
21+
"github.com/pingcap/ticdc/maintainer/operator"
22+
"github.com/pingcap/ticdc/maintainer/replica"
23+
"github.com/pingcap/ticdc/maintainer/span"
24+
"github.com/pingcap/ticdc/maintainer/testutil"
25+
"github.com/pingcap/ticdc/pkg/common"
26+
appcontext "github.com/pingcap/ticdc/pkg/common/context"
27+
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
28+
"github.com/pingcap/ticdc/pkg/node"
29+
"github.com/pingcap/ticdc/server/watcher"
30+
"github.com/stretchr/testify/require"
31+
)
32+
1733
func TestScheduleEvent(t *testing.T) {
1834
testutil.SetUpTestServices()
1935
tableTriggerEventDispatcherID := common.NewDispatcherID()
@@ -209,4 +225,3 @@ func TestUpdateSchemaID(t *testing.T) {
209225
require.Len(t, spanController.GetTasksBySchemaID(2), 1)
210226
require.Equal(t, spanController.GetTasksByTableID(1)[0].GetSchemaID(), int64(2))
211227
}
212-
*/

0 commit comments

Comments
 (0)