Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
92ee0e1
dispatcher,event,cloudstorage: add DML two-stage ack
3AceShowHand Feb 24, 2026
01bd19a
dispatcher,event,cloudstorage: fix enqueue callback semantics
3AceShowHand Feb 25, 2026
c113666
2 small changes
3AceShowHand Feb 25, 2026
6a1be2d
fix test
3AceShowHand Feb 25, 2026
2f8b51c
downstreamadapter,event: simplify sink wake callback flow
3AceShowHand Feb 26, 2026
71f7771
Merge branch 'master' into storage-sink-two-stage-ack
3AceShowHand Feb 26, 2026
841178b
some small changes
3AceShowHand Feb 26, 2026
81bbfce
downstreamadapter,dispatcher: refine sink wake callbacks
3AceShowHand Feb 26, 2026
3ded4e0
fix code
3AceShowHand Feb 26, 2026
cdf2a98
fix
3AceShowHand Feb 26, 2026
460ac1e
fix
3AceShowHand Feb 26, 2026
05f0792
cloudstorage: drain affected dispatchers before ddl write
3AceShowHand Feb 27, 2026
0b76095
add pass block event to the sink interface
3AceShowHand Feb 27, 2026
2cef2fe
Add more comment
3AceShowHand Feb 28, 2026
8e51ebb
rename methods
3AceShowHand Mar 2, 2026
c8bb040
fix the code
3AceShowHand Mar 2, 2026
229673c
fix the code
3AceShowHand Mar 2, 2026
442a724
fix a lot of code
3AceShowHand Mar 2, 2026
9993792
fix
3AceShowHand Mar 2, 2026
a619c8f
adjust comments
3AceShowHand Mar 3, 2026
da1038b
update comments
3AceShowHand Mar 3, 2026
7954404
Merge branch 'master' into storage-sink-two-stage-ack
3AceShowHand Mar 3, 2026
d52a1a1
add more logs
3AceShowHand Mar 4, 2026
839d01b
fix code
3AceShowHand Mar 4, 2026
08edc5a
add more log to the consumer
3AceShowHand Mar 4, 2026
d488388
introduce drain phase
3AceShowHand Mar 4, 2026
e83390a
only affect the storage sink
3AceShowHand Mar 4, 2026
517a7a9
add a lot of code
3AceShowHand Mar 4, 2026
c99534e
fix barrier
3AceShowHand Mar 5, 2026
2ed0030
update test
3AceShowHand Mar 5, 2026
cbbaf16
fix
3AceShowHand Mar 5, 2026
7dee7c8
Merge remote-tracking branch 'refs/remotes/origin/storage-sink-two-st…
3AceShowHand Mar 5, 2026
e20b493
fix make fmt
3AceShowHand Mar 5, 2026
b104191
fix
3AceShowHand Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,8 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC
// When we handle events, we don't have any previous events still in sink.
//
// wakeCallback is used to wake the dynamic stream to handle the next batch events.
// It will be called when all the events are flushed to downstream successfully.
// It is triggered after DML events are enqueued to sink pipeline, while checkpoint
// still advances only after PostFlush callbacks are completed.
func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool {
if d.GetRemovingStatus() {
log.Warn("dispatcher is removing", zap.Any("id", d.id))
Expand Down Expand Up @@ -622,13 +623,8 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC

block = true
dml.ReplicatingTs = d.creationPDTs
dml.AddPostFlushFunc(func() {
// Considering dml event in sink may be written to downstream not in order,
// thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely
// and wake dynamic stream to handle the next events.
if d.tableProgress.Empty() {
dmlWakeOnce.Do(wakeCallback)
}
dml.AddPostEnqueueFunc(func() {
dmlWakeOnce.Do(wakeCallback)
})
dmlEvents = append(dmlEvents, dml)
case commonEvent.TypeDDLEvent:
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/sink/cloudstorage/dml_writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (d *dmlWriters) AddDMLEvent(event *commonEvent.DMLEvent) {
_ = d.statistics.RecordBatchExecution(func() (int, int64, error) {
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
d.msgCh.Push(newEventFragment(seq, tbl, event))
event.PostEnqueue()
return int(event.Len()), event.GetSize(), nil
})
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/common/event/dml_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,14 @@ type DMLEvent struct {

// The following fields are set and used by dispatcher.
ReplicatingTs uint64 `json:"replicating_ts"`
// PostTxnEnqueued is the functions to be executed after the transaction is
// enqueued into sink internal pipeline.
PostTxnEnqueued []func() `json:"-"`
// PostTxnFlushed is the functions to be executed after the transaction is flushed.
// It is set and used by dispatcher.
PostTxnFlushed []func() `json:"-"`
// postEnqueueCalled ensures PostTxnEnqueued callbacks are triggered at most once.
postEnqueueCalled bool `json:"-"`

// eventSize is the size of the event in bytes. It is set when it's unmarshaled.
eventSize int64 `json:"-"`
Expand Down Expand Up @@ -631,11 +636,22 @@ func (t *DMLEvent) GetStartTs() common.Ts {
}

func (t *DMLEvent) PostFlush() {
t.PostEnqueue()
for _, f := range t.PostTxnFlushed {
f()
}
}

func (t *DMLEvent) PostEnqueue() {
if t.postEnqueueCalled {
return
}
t.postEnqueueCalled = true
for _, f := range t.PostTxnEnqueued {
f()
}
}

func (t *DMLEvent) GetSeq() uint64 {
return t.Seq
}
Expand All @@ -656,6 +672,14 @@ func (t *DMLEvent) AddPostFlushFunc(f func()) {
t.PostTxnFlushed = append(t.PostTxnFlushed, f)
}

func (t *DMLEvent) ClearPostEnqueueFunc() {
t.PostTxnEnqueued = t.PostTxnEnqueued[:0]
}

func (t *DMLEvent) AddPostEnqueueFunc(f func()) {
t.PostTxnEnqueued = append(t.PostTxnEnqueued, f)
}

// Rewind reset the offset to 0, So that the next GetNextRow will return the first row
func (t *DMLEvent) Rewind() {
t.offset = 0
Expand Down
39 changes: 39 additions & 0 deletions pkg/common/event/dml_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package event

import (
"encoding/binary"
"sync/atomic"
"testing"

"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -342,3 +343,41 @@ func TestBatchDMLEventHeaderValidation(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "incomplete data")
}

func TestDMLEventPostEnqueueFuncs(t *testing.T) {
t.Parallel()

event := &DMLEvent{}
var called int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&called, 1)
})
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&called, 1)
})

event.PostEnqueue()
event.PostEnqueue()

require.Equal(t, int64(2), atomic.LoadInt64(&called))
}

func TestDMLEventPostFlushTriggersPostEnqueueOnce(t *testing.T) {
t.Parallel()

event := &DMLEvent{}
var enqueueCalled int64
var flushCalled int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&enqueueCalled, 1)
})
event.AddPostFlushFunc(func() {
atomic.AddInt64(&flushCalled, 1)
})

event.PostFlush()
event.PostFlush()

require.Equal(t, int64(1), atomic.LoadInt64(&enqueueCalled))
require.Equal(t, int64(2), atomic.LoadInt64(&flushCalled))
}
Loading