Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
92ee0e1
dispatcher,event,cloudstorage: add DML two-stage ack
3AceShowHand Feb 24, 2026
01bd19a
dispatcher,event,cloudstorage: fix enqueue callback semantics
3AceShowHand Feb 25, 2026
c113666
2 small changes
3AceShowHand Feb 25, 2026
6a1be2d
fix test
3AceShowHand Feb 25, 2026
2f8b51c
downstreamadapter,event: simplify sink wake callback flow
3AceShowHand Feb 26, 2026
71f7771
Merge branch 'master' into storage-sink-two-stage-ack
3AceShowHand Feb 26, 2026
841178b
some small changes
3AceShowHand Feb 26, 2026
81bbfce
downstreamadapter,dispatcher: refine sink wake callbacks
3AceShowHand Feb 26, 2026
3ded4e0
fix code
3AceShowHand Feb 26, 2026
cdf2a98
fix
3AceShowHand Feb 26, 2026
460ac1e
fix
3AceShowHand Feb 26, 2026
05f0792
cloudstorage: drain affected dispatchers before ddl write
3AceShowHand Feb 27, 2026
0b76095
add pass block event to the sink interface
3AceShowHand Feb 27, 2026
2cef2fe
Add more comment
3AceShowHand Feb 28, 2026
8e51ebb
rename methods
3AceShowHand Mar 2, 2026
c8bb040
fix the code
3AceShowHand Mar 2, 2026
229673c
fix the code
3AceShowHand Mar 2, 2026
442a724
fix a lot of code
3AceShowHand Mar 2, 2026
9993792
fix
3AceShowHand Mar 2, 2026
a619c8f
adjust comments
3AceShowHand Mar 3, 2026
da1038b
update comments
3AceShowHand Mar 3, 2026
7954404
Merge branch 'master' into storage-sink-two-stage-ack
3AceShowHand Mar 3, 2026
d52a1a1
add more logs
3AceShowHand Mar 4, 2026
839d01b
fix code
3AceShowHand Mar 4, 2026
08edc5a
add more log to the consumer
3AceShowHand Mar 4, 2026
d488388
introduce drain phase
3AceShowHand Mar 4, 2026
e83390a
only affect the storage sink
3AceShowHand Mar 4, 2026
517a7a9
add a lot of code
3AceShowHand Mar 4, 2026
c99534e
fix barrier
3AceShowHand Mar 5, 2026
2ed0030
update test
3AceShowHand Mar 5, 2026
cbbaf16
fix
3AceShowHand Mar 5, 2026
7dee7c8
Merge remote-tracking branch 'refs/remotes/origin/storage-sink-two-st…
3AceShowHand Mar 5, 2026
e20b493
fix make fmt
3AceShowHand Mar 5, 2026
b104191
fix
3AceShowHand Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,8 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC
// When we handle events, we don't have any previous events still in sink.
//
// wakeCallback is used to wake the dynamic stream to handle the next batch events.
// It will be called when all the events are flushed to downstream successfully.
// It is triggered after DML events are enqueued to sink pipeline, while checkpoint
// still advances only after PostFlush callbacks are completed.
func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool {
if d.GetRemovingStatus() {
log.Warn("dispatcher is removing", zap.Any("id", d.id))
Expand Down Expand Up @@ -622,13 +623,8 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC

block = true
dml.ReplicatingTs = d.creationPDTs
dml.AddPostFlushFunc(func() {
// Considering dml event in sink may be written to downstream not in order,
// thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely
// and wake dynamic stream to handle the next events.
if d.tableProgress.Empty() {
dmlWakeOnce.Do(wakeCallback)
}
dml.AddPostEnqueueFunc(func() {
dmlWakeOnce.Do(wakeCallback)
})
dmlEvents = append(dmlEvents, dml)
case commonEvent.TypeDDLEvent:
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/sink/cloudstorage/defragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) {
d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table))
workerID := d.hasher.Sum32() % uint32(len(d.outputChs))
d.outputChs[workerID].In() <- frag
frag.event.PostEnqueue()
d.lastDispatchedSeq = frag.seqNumber
}

Expand Down
7 changes: 2 additions & 5 deletions downstreamadapter/sink/cloudstorage/dml_writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,8 @@ func (d *dmlWriters) AddDMLEvent(event *commonEvent.DMLEvent) {
DispatcherID: event.GetDispatcherID(),
}
seq := atomic.AddUint64(&d.lastSeqNum, 1)
_ = d.statistics.RecordBatchExecution(func() (int, int64, error) {
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
d.msgCh.Push(newEventFragment(seq, tbl, event))
return int(event.Len()), event.GetSize(), nil
})
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
d.msgCh.Push(newEventFragment(seq, tbl, event))
}

func (d *dmlWriters) close() {
Expand Down
41 changes: 41 additions & 0 deletions downstreamadapter/sink/cloudstorage/dml_writers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,47 @@ func getTableFiles(t *testing.T, tableDir string) []string {
return fileNames
}

func TestAddDMLEventDoesNotCallPostEnqueueBeforePipelineRun(t *testing.T) {
uri := fmt.Sprintf("file:///%s?protocol=csv", t.TempDir())
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

tableInfo := &common.TableInfo{
TableName: common.TableName{
Schema: "test",
Table: "t_enqueue",
TableID: 100,
},
}
event := commonEvent.NewDMLEvent(common.NewDispatcherID(), tableInfo.TableName.TableID, 1, 1, tableInfo)
event.TableInfoVersion = 1
event.Length = 1
event.ApproximateSize = 1

var enqueueCalled int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&enqueueCalled, 1)
})

// Without starting sink.Run, the event should only be accepted by AddDMLEvent
// and should not be considered enqueued into downstream write pipeline yet.
cloudStorageSink.AddDMLEvent(event)
require.Equal(t, int64(0), atomic.LoadInt64(&enqueueCalled))
}

func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
parentDir := t.TempDir()

Expand Down
5 changes: 5 additions & 0 deletions pkg/common/event/active_active.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package event

import (
"sync/atomic"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/errors"
Expand Down Expand Up @@ -443,7 +445,10 @@ func newFilteredDMLEvent(
newEvent.Seq = source.Seq
newEvent.Epoch = source.Epoch
newEvent.ReplicatingTs = source.ReplicatingTs
newEvent.PostTxnEnqueued = source.PostTxnEnqueued
newEvent.PostTxnFlushed = source.PostTxnFlushed
newEvent.postEnqueueCalled = atomic.LoadUint32(&source.postEnqueueCalled)
source.PostTxnEnqueued = nil
source.PostTxnFlushed = nil

newEvent.SetRows(rows)
Expand Down
31 changes: 31 additions & 0 deletions pkg/common/event/active_active_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package event

import (
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -316,6 +317,36 @@ func TestFilterDMLEventSoftDeleteTableMissingColumnReportsError(t *testing.T) {
require.Contains(t, handledErr.Error(), SoftDeleteTimeColumn)
}

func TestFilterDMLEventKeepsPostEnqueueCallbacksOnFilteredEvent(t *testing.T) {
ti := newTestTableInfo(t, true, true)
ts := newTimestampValue(time.Date(2025, time.March, 10, 0, 0, 0, 0, time.UTC))
event := newDMLEventForTest(t, ti,
[]commonpkg.RowType{commonpkg.RowTypeUpdate},
[][]interface{}{
{int64(1), nil},
{int64(1), ts},
})

var enqueueCalled int64
var flushCalled int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&enqueueCalled, 1)
})
event.AddPostFlushFunc(func() {
atomic.AddInt64(&flushCalled, 1)
})

filtered, skip := FilterDMLEvent(event, false, nil)
require.False(t, skip)
require.NotNil(t, filtered)
require.NotEqual(t, event, filtered)

filtered.PostEnqueue()
filtered.PostFlush()
require.Equal(t, int64(1), atomic.LoadInt64(&enqueueCalled))
require.Equal(t, int64(1), atomic.LoadInt64(&flushCalled))
}

func newTestTableInfo(t *testing.T, activeActive, softDelete bool) *commonpkg.TableInfo {
idCol := newTestColumn(1, "id", mysql.TypeLong, mysql.PriKeyFlag)
cols := []*model.ColumnInfo{idCol}
Expand Down
51 changes: 49 additions & 2 deletions pkg/common/event/dml_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/binary"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -392,9 +393,18 @@ type DMLEvent struct {

// The following fields are set and used by dispatcher.
ReplicatingTs uint64 `json:"replicating_ts"`
// PostTxnFlushed is the functions to be executed after the transaction is flushed.
// It is set and used by dispatcher.
// PostTxnEnqueued contains callbacks executed when the txn is accepted by
// sink's internal pipeline (enqueue stage).
//
// Note: enqueue means "handed over to sink workers", not "durably written
// to downstream".
PostTxnEnqueued []func() `json:"-"`
// PostTxnFlushed contains callbacks executed when the txn is fully flushed to
// downstream (flush stage), which is stronger than enqueue and is used by
// checkpoint related logic.
PostTxnFlushed []func() `json:"-"`
// postEnqueueCalled ensures PostTxnEnqueued callbacks are triggered at most once.
postEnqueueCalled uint32 `json:"-"`

// eventSize is the size of the event in bytes. It is set when it's unmarshaled.
eventSize int64 `json:"-"`
Expand Down Expand Up @@ -630,12 +640,31 @@ func (t *DMLEvent) GetStartTs() common.Ts {
return t.StartTs
}

// PostFlush marks the transaction as flushed to downstream.
//
// It always calls PostEnqueue first to preserve callback order:
// enqueue callbacks run before flush callbacks, even for sinks that only invoke
// PostFlush and do not have an explicit enqueue hook.
func (t *DMLEvent) PostFlush() {
t.PostEnqueue()
for _, f := range t.PostTxnFlushed {
f()
}
}

// PostEnqueue marks the transaction as enqueued into sink internal pipeline.
//
// This stage does not mean data is already written to downstream. The method is
// idempotent and guarantees enqueue callbacks run at most once.
func (t *DMLEvent) PostEnqueue() {
if !atomic.CompareAndSwapUint32(&t.postEnqueueCalled, 0, 1) {
return
}
for _, f := range t.PostTxnEnqueued {
f()
}
}

func (t *DMLEvent) GetSeq() uint64 {
return t.Seq
}
Expand All @@ -644,18 +673,36 @@ func (t *DMLEvent) GetEpoch() uint64 {
return t.Epoch
}

// PushFrontFlushFunc prepends a flush callback so it runs before existing ones.
func (t *DMLEvent) PushFrontFlushFunc(f func()) {
t.PostTxnFlushed = append([]func(){f}, t.PostTxnFlushed...)
}

// ClearPostFlushFunc removes all registered flush callbacks.
func (t *DMLEvent) ClearPostFlushFunc() {
t.PostTxnFlushed = t.PostTxnFlushed[:0]
}

// AddPostFlushFunc registers a callback that runs at flush stage.
//
// Use this when the callback depends on downstream persistence semantics.
func (t *DMLEvent) AddPostFlushFunc(f func()) {
t.PostTxnFlushed = append(t.PostTxnFlushed, f)
}

// ClearPostEnqueueFunc removes all registered enqueue callbacks.
func (t *DMLEvent) ClearPostEnqueueFunc() {
t.PostTxnEnqueued = t.PostTxnEnqueued[:0]
}

// AddPostEnqueueFunc registers a callback that runs at enqueue stage.
//
// Use this when only sink acceptance is required. For sinks with no explicit
// enqueue signal, this callback is triggered via PostFlush.
func (t *DMLEvent) AddPostEnqueueFunc(f func()) {
t.PostTxnEnqueued = append(t.PostTxnEnqueued, f)
}

// Rewind reset the offset to 0, So that the next GetNextRow will return the first row
func (t *DMLEvent) Rewind() {
t.offset = 0
Expand Down
67 changes: 67 additions & 0 deletions pkg/common/event/dml_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package event

import (
"encoding/binary"
"sync"
"sync/atomic"
"testing"

"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -342,3 +344,68 @@ func TestBatchDMLEventHeaderValidation(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "incomplete data")
}

func TestDMLEventPostEnqueueFuncs(t *testing.T) {
t.Parallel()

event := &DMLEvent{}
var called int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&called, 1)
})
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&called, 1)
})

event.PostEnqueue()
event.PostEnqueue()

require.Equal(t, int64(2), atomic.LoadInt64(&called))
}

func TestDMLEventPostFlushTriggersPostEnqueueOnce(t *testing.T) {
t.Parallel()

event := &DMLEvent{}
var enqueueCalled int64
var flushCalled int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&enqueueCalled, 1)
})
event.AddPostFlushFunc(func() {
atomic.AddInt64(&flushCalled, 1)
})

event.PostFlush()
event.PostFlush()

require.Equal(t, int64(1), atomic.LoadInt64(&enqueueCalled))
require.Equal(t, int64(2), atomic.LoadInt64(&flushCalled))
}

func TestDMLEventPostEnqueueConcurrentWithPostFlush(t *testing.T) {
t.Parallel()

event := &DMLEvent{}
var enqueueCalled int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&enqueueCalled, 1)
})

var wg sync.WaitGroup
const loops = 256
for i := 0; i < loops; i++ {
wg.Add(2)
go func() {
defer wg.Done()
event.PostEnqueue()
}()
go func() {
defer wg.Done()
event.PostFlush()
}()
}
wg.Wait()

require.Equal(t, int64(1), atomic.LoadInt64(&enqueueCalled))
}
2 changes: 1 addition & 1 deletion pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
// `0 0 2 * * ?` means 2:00:00 AM every day
defaultFileCleanupCronSpec = "0 0 2 * * *"

defaultEnableTableAcrossNodes = true
defaultEnableTableAcrossNodes = false
)

type urlConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/cloudstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestConfigApply(t *testing.T) {
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)
cfg := NewConfig()
err = cfg.Apply(context.TODO(), sinkURI, replicaConfig.Sink, true)
err = cfg.Apply(context.TODO(), sinkURI, replicaConfig.Sink, false)
require.Nil(t, err)
require.Equal(t, expected, cfg)
}
Expand Down
Loading