Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
92ee0e1
dispatcher,event,cloudstorage: add DML two-stage ack
3AceShowHand Feb 24, 2026
01bd19a
dispatcher,event,cloudstorage: fix enqueue callback semantics
3AceShowHand Feb 25, 2026
c113666
2 small changes
3AceShowHand Feb 25, 2026
6a1be2d
fix test
3AceShowHand Feb 25, 2026
2f8b51c
downstreamadapter,event: simplify sink wake callback flow
3AceShowHand Feb 26, 2026
71f7771
Merge branch 'master' into storage-sink-two-stage-ack
3AceShowHand Feb 26, 2026
841178b
some small changes
3AceShowHand Feb 26, 2026
81bbfce
downstreamadapter,dispatcher: refine sink wake callbacks
3AceShowHand Feb 26, 2026
3ded4e0
fix code
3AceShowHand Feb 26, 2026
cdf2a98
fix
3AceShowHand Feb 26, 2026
460ac1e
fix
3AceShowHand Feb 26, 2026
05f0792
cloudstorage: drain affected dispatchers before ddl write
3AceShowHand Feb 27, 2026
0b76095
add pass block event to the sink interface
3AceShowHand Feb 27, 2026
2cef2fe
Add more comment
3AceShowHand Feb 28, 2026
8e51ebb
rename methods
3AceShowHand Mar 2, 2026
c8bb040
fix the code
3AceShowHand Mar 2, 2026
229673c
fix the code
3AceShowHand Mar 2, 2026
442a724
fix a lot of code
3AceShowHand Mar 2, 2026
9993792
fix
3AceShowHand Mar 2, 2026
a619c8f
adjust comments
3AceShowHand Mar 3, 2026
da1038b
update comments
3AceShowHand Mar 3, 2026
7954404
Merge branch 'master' into storage-sink-two-stage-ack
3AceShowHand Mar 3, 2026
d52a1a1
add more logs
3AceShowHand Mar 4, 2026
839d01b
fix code
3AceShowHand Mar 4, 2026
08edc5a
add more log to the consumer
3AceShowHand Mar 4, 2026
d488388
introduce drain phase
3AceShowHand Mar 4, 2026
e83390a
only affect the storage sink
3AceShowHand Mar 4, 2026
517a7a9
add a lot of code
3AceShowHand Mar 4, 2026
c99534e
fix barrier
3AceShowHand Mar 5, 2026
2ed0030
update test
3AceShowHand Mar 5, 2026
cbbaf16
fix
3AceShowHand Mar 5, 2026
7dee7c8
Merge remote-tracking branch 'refs/remotes/origin/storage-sink-two-st…
3AceShowHand Mar 5, 2026
e20b493
fix make fmt
3AceShowHand Mar 5, 2026
b104191
fix
3AceShowHand Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,10 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC
// When we handle events, we don't have any previous events still in sink.
//
// wakeCallback is used to wake the dynamic stream to handle the next batch events.
// It will be called when all the events are flushed to downstream successfully.
// It is registered on enqueue stage, and sink implementations are responsible for
// deciding when enqueue is considered complete (some sinks trigger enqueue via
// PostFlush after downstream flush completes).
// Checkpoint still advances only after PostFlush callbacks are completed.
func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool {
if d.GetRemovingStatus() {
log.Warn("dispatcher is removing", zap.Any("id", d.id))
Expand Down Expand Up @@ -622,13 +625,8 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC

block = true
dml.ReplicatingTs = d.creationPDTs
dml.AddPostFlushFunc(func() {
// Considering dml event in sink may be written to downstream not in order,
// thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely
// and wake dynamic stream to handle the next events.
if d.tableProgress.Empty() {
dmlWakeOnce.Do(wakeCallback)
}
dml.AddPostEnqueueFunc(func() {
dmlWakeOnce.Do(wakeCallback)
})
dmlEvents = append(dmlEvents, dml)
case commonEvent.TypeDDLEvent:
Expand Down
70 changes: 70 additions & 0 deletions downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package dispatcher

import (
"fmt"
"math"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -776,6 +777,75 @@ func TestBatchDMLEventsPartialFlush(t *testing.T) {
require.Equal(t, 0, len(mockSink.GetDMLs()))
}

func TestDMLWakeCallbackBySinkType(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
ddlJob := helper.DDL2Job("create table t(id int primary key, v int)")
require.NotNil(t, ddlJob)

buildDMLEvent := func(commitTs uint64) *commonEvent.DMLEvent {
event := helper.DML2Event(
"test",
"t",
fmt.Sprintf("insert into t values(%d, %d)", commitTs, commitTs),
fmt.Sprintf("insert into t values(%d, %d)", commitTs+1000, commitTs+1000),
)
require.NotNil(t, event)
event.CommitTs = commitTs
return event
}

testCases := []struct {
name string
sinkType common.SinkType
commitTs uint64
wakeOnEnqueue bool
}{
{
name: "mysql wake after flush",
sinkType: common.MysqlSinkType,
commitTs: 20,
wakeOnEnqueue: false,
},
{
name: "cloudstorage wake on enqueue",
sinkType: common.CloudStorageSinkType,
commitTs: 21,
wakeOnEnqueue: true,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mockSink := sink.NewMockSink(tc.sinkType)
tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
require.NoError(t, err)
dispatcher := newDispatcherForTest(mockSink, tableSpan)

dmlEvent := buildDMLEvent(tc.commitTs)
nodeID := node.NewID()
var callbackCalled atomic.Bool
block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, dmlEvent)}, func() {
callbackCalled.Store(true)
})
require.True(t, block)
require.Equal(t, tc.wakeOnEnqueue, callbackCalled.Load())
require.Len(t, mockSink.GetDMLs(), 1)

if tc.wakeOnEnqueue {
return
}

mockSink.FlushDMLs()
require.True(t, callbackCalled.Load())
require.Len(t, mockSink.GetDMLs(), 0)
})
}
}

// TestDispatcherSplittableCheck tests that a split table dispatcher with enableSplittableCheck=true
// correctly reports an error when receiving a DDL that breaks splittable
func TestDispatcherSplittableCheck(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/sink/cloudstorage/defragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) {
d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table))
workerID := d.hasher.Sum32() % uint32(len(d.outputChs))
d.outputChs[workerID].In() <- frag
frag.event.PostEnqueue()
d.lastDispatchedSeq = frag.seqNumber
}

Expand Down
7 changes: 2 additions & 5 deletions downstreamadapter/sink/cloudstorage/dml_writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,8 @@ func (d *dmlWriters) AddDMLEvent(event *commonEvent.DMLEvent) {
DispatcherID: event.GetDispatcherID(),
}
seq := atomic.AddUint64(&d.lastSeqNum, 1)
_ = d.statistics.RecordBatchExecution(func() (int, int64, error) {
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
d.msgCh.Push(newEventFragment(seq, tbl, event))
return int(event.Len()), event.GetSize(), nil
})
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
d.msgCh.Push(newEventFragment(seq, tbl, event))
}

func (d *dmlWriters) close() {
Expand Down
41 changes: 41 additions & 0 deletions downstreamadapter/sink/cloudstorage/dml_writers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,47 @@ func getTableFiles(t *testing.T, tableDir string) []string {
return fileNames
}

func TestAddDMLEventDoesNotCallPostEnqueueBeforePipelineRun(t *testing.T) {
uri := fmt.Sprintf("file:///%s?protocol=csv", t.TempDir())
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

tableInfo := &common.TableInfo{
TableName: common.TableName{
Schema: "test",
Table: "t_enqueue",
TableID: 100,
},
}
event := commonEvent.NewDMLEvent(common.NewDispatcherID(), tableInfo.TableName.TableID, 1, 1, tableInfo)
event.TableInfoVersion = 1
event.Length = 1
event.ApproximateSize = 1

var enqueueCalled int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&enqueueCalled, 1)
})

// Without starting sink.Run, the event should only be accepted by AddDMLEvent
// and should not be considered enqueued into downstream write pipeline yet.
cloudStorageSink.AddDMLEvent(event)
require.Equal(t, int64(0), atomic.LoadInt64(&enqueueCalled))
}

func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
parentDir := t.TempDir()

Expand Down
30 changes: 30 additions & 0 deletions downstreamadapter/sink/helper/row_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"go.uber.org/atomic"
)

// NewTxnPostFlushRowCallback returns a row-level callback that triggers txn-level
// PostFlush exactly once when the callback has been invoked totalCount times.
func NewTxnPostFlushRowCallback(event *commonEvent.DMLEvent, totalCount uint64) func() {
var calledCount atomic.Uint64
return func() {
if calledCount.Inc() == totalCount {
event.PostFlush()
}
}
}
42 changes: 42 additions & 0 deletions downstreamadapter/sink/helper/row_callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

import (
"testing"

commonType "github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/stretchr/testify/require"
)

func TestTxnPostFlushRowCallback(t *testing.T) {
event := commonEvent.NewDMLEvent(commonType.NewDispatcherID(), 1, 1, 1, nil)

flushCount := 0
event.AddPostFlushFunc(func() {
flushCount++
})

rowCallback := NewTxnPostFlushRowCallback(event, 3)
rowCallback()
rowCallback()
require.Equal(t, 0, flushCount)

rowCallback()
require.Equal(t, 1, flushCount)

rowCallback()
require.Equal(t, 1, flushCount)
}
14 changes: 1 addition & 13 deletions downstreamadapter/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,8 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {

partitionGenerator := s.comp.eventRouter.GetPartitionGenerator(schema, table)
selector := s.comp.columnSelector.Get(schema, table)
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
}
}
}

rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount)

for {
row, ok := event.GetNextRow()
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/sink/mock_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (s *mockSink) AddDMLEvent(event *commonEvent.DMLEvent) {
s.mu.Lock()
defer s.mu.Unlock()
s.dmls = append(s.dmls, event)
// CloudStorage sink wakes dispatcher on enqueue stage.
if s.sinkType == common.CloudStorageSinkType {
event.PostEnqueue()
}
}

func (s *mockSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
Expand Down
14 changes: 1 addition & 13 deletions downstreamadapter/sink/pulsar/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,20 +329,8 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {

partitionGenerator := s.comp.eventRouter.GetPartitionGenerator(schema, table)
selector := s.comp.columnSelector.Get(schema, table)
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
}
}
}

rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount)

for {
row, ok := event.GetNextRow()
Expand Down
14 changes: 2 additions & 12 deletions downstreamadapter/sink/redo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
Expand Down Expand Up @@ -128,19 +129,8 @@ func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error {

func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) {
_ = s.statistics.RecordBatchExecution(func() (int, int64, error) {
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
}
}
}
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount)

for {
row, ok := event.GetNextRow()
Expand Down
5 changes: 5 additions & 0 deletions pkg/common/event/active_active.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package event

import (
"sync/atomic"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/errors"
Expand Down Expand Up @@ -443,7 +445,10 @@ func newFilteredDMLEvent(
newEvent.Seq = source.Seq
newEvent.Epoch = source.Epoch
newEvent.ReplicatingTs = source.ReplicatingTs
newEvent.PostTxnEnqueued = source.PostTxnEnqueued
newEvent.PostTxnFlushed = source.PostTxnFlushed
newEvent.postEnqueueCalled = atomic.LoadUint32(&source.postEnqueueCalled)
source.PostTxnEnqueued = nil
source.PostTxnFlushed = nil

newEvent.SetRows(rows)
Expand Down
Loading