Skip to content

Commit 45f905c

Browse files
committed
syncpoint: revert syncpoint batch operation (#3235)
close #3234
1 parent fb07628 commit 45f905c

File tree

13 files changed

+96
-656
lines changed

13 files changed

+96
-656
lines changed

downstreamadapter/dispatcher/basic_dispatcher.go

Lines changed: 20 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
493493
syncPoint := event.(*commonEvent.SyncPointEvent)
494494
log.Info("dispatcher receive sync point event",
495495
zap.Stringer("dispatcher", d.id),
496-
zap.Any("commitTsList", syncPoint.GetCommitTsList()),
496+
zap.Uint64("commitTs", syncPoint.GetCommitTs()),
497497
zap.Uint64("seq", event.GetSeq()))
498498

499499
syncPoint.AddPostFlushFunc(func() {
@@ -700,66 +700,26 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
700700
} else {
701701
d.blockEventStatus.setBlockEvent(event, heartbeatpb.BlockStage_WAITING)
702702

703-
if event.GetType() == commonEvent.TypeSyncPointEvent {
704-
// deal with multi sync point commit ts in one Sync Point Event
705-
// we only report the latest commitTs as the blockTs.
706-
// If A receive [syncpont1, syncpoint2, syncpoint3]
707-
// B receive [syncpoint1]
708-
// C receive [syncpoint1, syncpoint2]
709-
// then A report syncpoint3, B report syncpoint1, C report syncpoint2
710-
// and barrier find A checkpointTs is exceed syncpoint1 and syncpoint2,
711-
// so will just make B and C pass these syncpoint, to receive the latest syncpoint3
712-
// then make syncpoint3 Write successfully.
713-
714-
// TODO(hyy): we could consider to just use the latest commitTs to represent this batch sync point event,
715-
// instead of obtain a commitTsList in the sync point event.
716-
commitTsList := event.(*commonEvent.SyncPointEvent).GetCommitTsList()
717-
blockTables := event.GetBlockedTables().ToPB()
718-
needDroppedTables := event.GetNeedDroppedTables().ToPB()
719-
needAddedTables := commonEvent.ToTablesPB(event.GetNeedAddedTables())
720-
commitTs := commitTsList[len(commitTsList)-1]
721-
message := &heartbeatpb.TableSpanBlockStatus{
722-
ID: d.id.ToPB(),
723-
State: &heartbeatpb.State{
724-
IsBlocked: true,
725-
BlockTs: commitTs,
726-
BlockTables: blockTables,
727-
NeedDroppedTables: needDroppedTables,
728-
NeedAddedTables: needAddedTables,
729-
UpdatedSchemas: nil,
730-
IsSyncPoint: true,
731-
Stage: heartbeatpb.BlockStage_WAITING,
732-
},
733-
Mode: d.GetMode(),
734-
}
735-
identifier := BlockEventIdentifier{
736-
CommitTs: commitTs,
737-
IsSyncPoint: true,
738-
}
739-
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
740-
d.sharedInfo.blockStatusesChan <- message
741-
} else {
742-
message := &heartbeatpb.TableSpanBlockStatus{
743-
ID: d.id.ToPB(),
744-
State: &heartbeatpb.State{
745-
IsBlocked: true,
746-
BlockTs: event.GetCommitTs(),
747-
BlockTables: event.GetBlockedTables().ToPB(),
748-
NeedDroppedTables: event.GetNeedDroppedTables().ToPB(),
749-
NeedAddedTables: commonEvent.ToTablesPB(event.GetNeedAddedTables()),
750-
UpdatedSchemas: commonEvent.ToSchemaIDChangePB(event.GetUpdatedSchemas()), // only exists for rename table and rename tables
751-
IsSyncPoint: false,
752-
Stage: heartbeatpb.BlockStage_WAITING,
753-
},
754-
Mode: d.GetMode(),
755-
}
756-
identifier := BlockEventIdentifier{
757-
CommitTs: event.GetCommitTs(),
758-
IsSyncPoint: false,
759-
}
760-
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
761-
d.sharedInfo.blockStatusesChan <- message
703+
message := &heartbeatpb.TableSpanBlockStatus{
704+
ID: d.id.ToPB(),
705+
State: &heartbeatpb.State{
706+
IsBlocked: true,
707+
BlockTs: event.GetCommitTs(),
708+
BlockTables: event.GetBlockedTables().ToPB(),
709+
NeedDroppedTables: event.GetNeedDroppedTables().ToPB(),
710+
NeedAddedTables: commonEvent.ToTablesPB(event.GetNeedAddedTables()),
711+
UpdatedSchemas: commonEvent.ToSchemaIDChangePB(event.GetUpdatedSchemas()),
712+
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent,
713+
Stage: heartbeatpb.BlockStage_WAITING,
714+
},
715+
Mode: d.GetMode(),
716+
}
717+
identifier := BlockEventIdentifier{
718+
CommitTs: event.GetCommitTs(),
719+
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent,
762720
}
721+
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
722+
d.sharedInfo.blockStatusesChan <- message
763723
}
764724

765725
// dealing with events which update schema ids

downstreamadapter/dispatcher/event_dispatcher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ func TestDispatcherHandleEvents(t *testing.T) {
338338
// ===== sync point event =====
339339

340340
syncPointEvent := &commonEvent.SyncPointEvent{
341-
CommitTsList: []uint64{6},
341+
CommitTs: 6,
342342
}
343343
block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, syncPointEvent)}, callback)
344344
require.Equal(t, true, block)

downstreamadapter/dispatcher/helper.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,6 @@ func (b *BlockEventStatus) setBlockEvent(event commonEvent.BlockEvent, blockStag
104104
b.blockPendingEvent = event
105105
b.blockStage = blockStage
106106
b.blockCommitTs = event.GetCommitTs()
107-
108-
if event.GetType() == commonEvent.TypeSyncPointEvent {
109-
commitTsList := event.(*commonEvent.SyncPointEvent).GetCommitTsList()
110-
b.blockCommitTs = commitTsList[len(commitTsList)-1]
111-
}
112107
}
113108

114109
func (b *BlockEventStatus) updateBlockStage(blockStage heartbeatpb.BlockStage) {
@@ -131,10 +126,7 @@ func (b *BlockEventStatus) getEventAndStage() (commonEvent.BlockEvent, heartbeat
131126
return b.blockPendingEvent, b.blockStage
132127
}
133128

134-
// actionMatchs checks whether the action is for the current pending ddl event.
135-
// Most of time, the pending event only have one commitTs, so when the commitTs of the action meets the pending event's commitTs, it is enough.
136-
// While if the pending event is a sync point event with multiple commitTs, we only can do the action
137-
// when all the commitTs have been received.
129+
// actionMatchs checks whether the action is for the current pending ddl/sync point event.
138130
func (b *BlockEventStatus) actionMatchs(action *heartbeatpb.DispatcherAction) bool {
139131
b.mutex.Lock()
140132
defer b.mutex.Unlock()

downstreamadapter/dispatcher/table_progress.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,9 @@ func NewTableProgress() *TableProgress {
6363
}
6464
}
6565

66-
func getFinalCommitTs(event commonEvent.Event) uint64 {
67-
commitTs := event.GetCommitTs()
68-
if event.GetType() == commonEvent.TypeSyncPointEvent {
69-
// if the event is a sync point event, we use the last commitTs(the largest commitTs in the event) to calculate the progress.
70-
// because a sync point event with multiple commitTs means there is no ddl / dmls between these commitTs.
71-
// So we can just use the largest commitTs in the sync point event to calculate the progress.
72-
commitTsList := event.(*commonEvent.SyncPointEvent).GetCommitTsList()
73-
commitTs = commitTsList[len(commitTsList)-1]
74-
}
75-
return commitTs
76-
}
77-
7866
// Add inserts a new event into the TableProgress.
7967
func (p *TableProgress) Add(event commonEvent.FlushableEvent) {
80-
commitTs := getFinalCommitTs(event)
68+
commitTs := event.GetCommitTs()
8169
ts := Ts{startTs: event.GetStartTs(), commitTs: commitTs}
8270

8371
p.rwMutex.Lock()
@@ -94,7 +82,7 @@ func (p *TableProgress) Add(event commonEvent.FlushableEvent) {
9482
// Remove deletes an event from the TableProgress.
9583
// Note: Consider implementing batch removal in the future if needed.
9684
func (p *TableProgress) Remove(event commonEvent.FlushableEvent) {
97-
ts := Ts{startTs: event.GetStartTs(), commitTs: getFinalCommitTs(event)}
85+
ts := Ts{startTs: event.GetStartTs(), commitTs: event.GetCommitTs()}
9886
p.rwMutex.Lock()
9987
defer p.rwMutex.Unlock()
10088

@@ -122,7 +110,7 @@ func (p *TableProgress) Pass(event commonEvent.FlushableEvent) {
122110
p.rwMutex.Lock()
123111
defer p.rwMutex.Unlock()
124112

125-
p.maxCommitTs = getFinalCommitTs(event)
113+
p.maxCommitTs = event.GetCommitTs()
126114
}
127115

128116
func (p *TableProgress) Len() int {

downstreamadapter/dispatcher/table_progress_test.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,36 +66,34 @@ func TestTableProgress(t *testing.T) {
6666
assert.True(t, isEmpty)
6767
}
6868

69-
// TestSyncPointEventWithMultipleCommitTs tests the behavior when SyncPointEvent contains multiple commitTs
70-
func TestSyncPointEventWithMultipleCommitTs(t *testing.T) {
69+
// TestSyncPointEventCommitTs tests the behavior for SyncPointEvent commitTs
70+
func TestSyncPointEventCommitTs(t *testing.T) {
7171
tp := NewTableProgress()
7272
assert.True(t, tp.Empty())
7373

7474
dispatcherID := common.NewDispatcherID()
7575

76-
// Create a SyncPointEvent with multiple commitTs
76+
// Create a SyncPointEvent
7777
syncPointEvent := &commonEvent.SyncPointEvent{
7878
DispatcherID: dispatcherID,
79-
CommitTsList: []uint64{10, 20, 30, 40},
79+
CommitTs: 40,
8080
}
8181

82-
// Should return the last (largest) commitTs from the list
83-
finalCommitTs := getFinalCommitTs(syncPointEvent)
82+
finalCommitTs := syncPointEvent.CommitTs
8483
assert.Equal(t, uint64(40), finalCommitTs, "getFinalCommitTs should return the largest commitTs")
8584

86-
// Test Add method with SyncPointEvent containing multiple commitTs
85+
// Test Add method with SyncPointEvent
8786
tp.Add(syncPointEvent)
8887
assert.False(t, tp.Empty())
8988

90-
// Verify that maxCommitTs is set to the largest commitTs
9189
assert.Equal(t, uint64(40), tp.maxCommitTs, "maxCommitTs should be set to the largest commitTs")
9290

9391
// Verify GetCheckpointTs behavior
9492
checkpointTs, isEmpty := tp.GetCheckpointTs()
9593
assert.Equal(t, uint64(39), checkpointTs, "checkpointTs should be largest commitTs - 1")
9694
assert.False(t, isEmpty)
9795

98-
// Test Remove method with SyncPointEvent containing multiple commitTs
96+
// Test Remove method
9997
tp.Remove(syncPointEvent)
10098
assert.True(t, tp.Empty(), "TableProgress should be empty after removing the event")
10199

@@ -104,10 +102,10 @@ func TestSyncPointEventWithMultipleCommitTs(t *testing.T) {
104102
assert.Equal(t, uint64(39), checkpointTs, "checkpointTs should remain as maxCommitTs - 1 after removal")
105103
assert.True(t, isEmpty)
106104

107-
// Create a SyncPointEvent with multiple commitTs
105+
// Create another SyncPointEvent
108106
syncPointEvent = &commonEvent.SyncPointEvent{
109107
DispatcherID: dispatcherID,
110-
CommitTsList: []uint64{50, 60},
108+
CommitTs: 60,
111109
}
112110

113111
tp.Add(syncPointEvent)
@@ -125,7 +123,7 @@ func TestSyncPointEventWithMultipleCommitTs(t *testing.T) {
125123

126124
syncPointEvent = &commonEvent.SyncPointEvent{
127125
DispatcherID: dispatcherID,
128-
CommitTsList: []uint64{80},
126+
CommitTs: 80,
129127
}
130128

131129
tp.Pass(syncPointEvent)

downstreamadapter/eventcollector/dispatcher_stat_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,8 @@ func TestVerifyEventSequence(t *testing.T) {
265265
lastEventSeq: 3,
266266
event: dispatcher.DispatcherEvent{
267267
Event: &commonEvent.SyncPointEvent{
268-
CommitTsList: []uint64{100},
269-
Seq: 5,
268+
CommitTs: 100,
269+
Seq: 5,
270270
},
271271
},
272272
expectedResult: false,

0 commit comments

Comments
 (0)