Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ee8cb6b
remove batch configuration from the stream level, and add size policy
3AceShowHand Jan 13, 2026
480045b
add batch capacity interface to the sink
3AceShowHand Jan 13, 2026
d6a3906
dispatcher expose sink method
3AceShowHand Jan 13, 2026
ea4c359
fix some dynamic stream
3AceShowHand Jan 13, 2026
64e5e27
add events
3AceShowHand Jan 13, 2026
64c783f
add more events
3AceShowHand Jan 13, 2026
a60ca8b
fix more code
3AceShowHand Jan 14, 2026
c60f623
adjust the code
3AceShowHand Jan 14, 2026
3611ef8
support config
3AceShowHand Jan 14, 2026
50a85fe
Adjust the batcher
3AceShowHand Jan 15, 2026
a794eae
adjust some batch configs
3AceShowHand Jan 15, 2026
3b6d851
add batch count and batch bytes configuration
3AceShowHand Jan 15, 2026
387ad33
log puller should works just like before, no behavior change
3AceShowHand Jan 15, 2026
f26babf
let the dispatcher manager determine batch counts and bytes
3AceShowHand Jan 15, 2026
d9b544c
remove get sink from the dispatcher service interfacE
3AceShowHand Jan 15, 2026
f0f949a
adjust code
3AceShowHand Jan 15, 2026
d66a3e4
adjust code
3AceShowHand Jan 16, 2026
18e7fa6
Add area level batch metrics
3AceShowHand Jan 16, 2026
a424789
adjust set
3AceShowHand Jan 16, 2026
7df217a
clear the event buf after used
3AceShowHand Jan 16, 2026
b26fd7a
adjust isFull logic
3AceShowHand Jan 16, 2026
ae12398
add more test
3AceShowHand Jan 16, 2026
91126f6
add more code
3AceShowHand Feb 3, 2026
db6738b
fix all code
3AceShowHand Feb 26, 2026
9a71b75
add more
3AceShowHand Feb 26, 2026
0e5c177
add more code
3AceShowHand Feb 27, 2026
d48f5ad
add more code
3AceShowHand Feb 27, 2026
2e09daa
Merge branch 'master' into batch-type-ds
3AceShowHand Mar 2, 2026
3a7d793
fix test
3AceShowHand Mar 2, 2026
f71af9a
fix a lot of code
3AceShowHand Mar 2, 2026
ebff397
add more code
3AceShowHand Mar 2, 2026
30e4f93
fix
3AceShowHand Mar 2, 2026
a829ae9
fix a lot of code
3AceShowHand Mar 3, 2026
7d93e10
add more
3AceShowHand Mar 3, 2026
e603509
Add more code
3AceShowHand Mar 4, 2026
2f1ddc8
Merge branch 'master' into batch-type-ds
3AceShowHand Mar 4, 2026
b7f7aea
fix format
3AceShowHand Mar 4, 2026
55ce609
adjust a lot of code
3AceShowHand Mar 4, 2026
bc53688
add mock sink
3AceShowHand Mar 4, 2026
dd35ba6
add more code
3AceShowHand Mar 4, 2026
2ba3dd8
add a lot of changes
3AceShowHand Mar 4, 2026
c051d99
add a lot of code
3AceShowHand Mar 4, 2026
5b47bd6
Adjust code a lot
3AceShowHand Mar 4, 2026
438d3aa
update test
3AceShowHand Mar 5, 2026
9aada3d
Merge branch 'master' into batch-type-ds
3AceShowHand Mar 6, 2026
cec71b2
Merge branch 'master' into batch-type-ds
3AceShowHand Mar 6, 2026
df3d505
fix the code
3AceShowHand Mar 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,16 @@ func (d *JSONDuration) UnmarshalJSON(b []byte) error {

// ReplicaConfig is a duplicate of config.ReplicaConfig
type ReplicaConfig struct {
MemoryQuota *uint64 `json:"memory_quota,omitempty"`
CaseSensitive *bool `json:"case_sensitive,omitempty"`
ForceReplicate *bool `json:"force_replicate,omitempty"`
IgnoreIneligibleTable *bool `json:"ignore_ineligible_table,omitempty"`
CheckGCSafePoint *bool `json:"check_gc_safe_point,omitempty"`
EnableSyncPoint *bool `json:"enable_sync_point,omitempty"`
EnableTableMonitor *bool `json:"enable_table_monitor,omitempty"`
BDRMode *bool `json:"bdr_mode,omitempty"`
MemoryQuota *uint64 `json:"memory_quota,omitempty"`
EventCollectorBatchCount *int `json:"event_collector_batch_count,omitempty"`
EventCollectorBatchBytes *int `json:"event_collector_batch_bytes,omitempty"`
CaseSensitive *bool `json:"case_sensitive,omitempty"`
ForceReplicate *bool `json:"force_replicate,omitempty"`
IgnoreIneligibleTable *bool `json:"ignore_ineligible_table,omitempty"`
CheckGCSafePoint *bool `json:"check_gc_safe_point,omitempty"`
EnableSyncPoint *bool `json:"enable_sync_point,omitempty"`
EnableTableMonitor *bool `json:"enable_table_monitor,omitempty"`
BDRMode *bool `json:"bdr_mode,omitempty"`
// EnableActiveActive enables active-active replication mode on top of BDR.
// It requires BDRMode to be true and is only supported by TiDB and storage sinks.
EnableActiveActive *bool `json:"enable_active_active,omitempty"`
Expand Down Expand Up @@ -246,6 +248,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.MemoryQuota != nil {
res.MemoryQuota = c.MemoryQuota
}
if c.EventCollectorBatchCount != nil {
res.EventCollectorBatchCount = c.EventCollectorBatchCount
}
if c.EventCollectorBatchBytes != nil {
res.EventCollectorBatchBytes = c.EventCollectorBatchBytes
}
if c.CaseSensitive != nil {
res.CaseSensitive = c.CaseSensitive
}
Expand Down Expand Up @@ -653,15 +661,17 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
cloned := c.Clone()

res := &ReplicaConfig{
MemoryQuota: cloned.MemoryQuota,
CaseSensitive: cloned.CaseSensitive,
ForceReplicate: cloned.ForceReplicate,
IgnoreIneligibleTable: cloned.IgnoreIneligibleTable,
CheckGCSafePoint: cloned.CheckGCSafePoint,
EnableSyncPoint: cloned.EnableSyncPoint,
EnableTableMonitor: cloned.EnableTableMonitor,
BDRMode: cloned.BDRMode,
EnableActiveActive: cloned.EnableActiveActive,
MemoryQuota: cloned.MemoryQuota,
EventCollectorBatchCount: cloned.EventCollectorBatchCount,
EventCollectorBatchBytes: cloned.EventCollectorBatchBytes,
CaseSensitive: cloned.CaseSensitive,
ForceReplicate: cloned.ForceReplicate,
IgnoreIneligibleTable: cloned.IgnoreIneligibleTable,
CheckGCSafePoint: cloned.CheckGCSafePoint,
EnableSyncPoint: cloned.EnableSyncPoint,
EnableTableMonitor: cloned.EnableTableMonitor,
BDRMode: cloned.BDRMode,
EnableActiveActive: cloned.EnableActiveActive,
}

if cloned.SyncPointInterval != nil {
Expand Down
39 changes: 39 additions & 0 deletions api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,42 @@ func TestReplicaConfigConversion(t *testing.T) {
require.Equal(t, "correctness", *apiCfgBack.Integrity.IntegrityCheckLevel)
require.Equal(t, "eventual", *apiCfgBack.Consistent.Level)
}

func TestReplicaConfigConversionBatchFields(t *testing.T) {
t.Parallel()

apiCfg := &ReplicaConfig{
EventCollectorBatchCount: util.AddressOf(4096),
EventCollectorBatchBytes: util.AddressOf(2048),
}
internalCfg := apiCfg.ToInternalReplicaConfig()
require.Equal(t, 4096, util.GetOrZero(internalCfg.EventCollectorBatchCount))
require.Equal(t, 2048, util.GetOrZero(internalCfg.EventCollectorBatchBytes))

apiCfgBack := ToAPIReplicaConfig(internalCfg)
require.NotNil(t, apiCfgBack.EventCollectorBatchCount)
require.NotNil(t, apiCfgBack.EventCollectorBatchBytes)
require.Equal(t, 4096, *apiCfgBack.EventCollectorBatchCount)
require.Equal(t, 2048, *apiCfgBack.EventCollectorBatchBytes)

apiCfgNil := &ReplicaConfig{}
internalCfgNil := apiCfgNil.ToInternalReplicaConfig()
defaultCfg := config.GetDefaultReplicaConfig()
require.Equal(
t,
util.GetOrZero(defaultCfg.EventCollectorBatchCount),
util.GetOrZero(internalCfgNil.EventCollectorBatchCount),
)
require.Equal(
t,
util.GetOrZero(defaultCfg.EventCollectorBatchBytes),
util.GetOrZero(internalCfgNil.EventCollectorBatchBytes),
)

internalCfgNoBatch := config.GetDefaultReplicaConfig()
internalCfgNoBatch.EventCollectorBatchCount = nil
internalCfgNoBatch.EventCollectorBatchBytes = nil
apiNoBatch := ToAPIReplicaConfig(internalCfgNoBatch)
require.Nil(t, apiNoBatch.EventCollectorBatchCount)
require.Nil(t, apiNoBatch.EventCollectorBatchBytes)
}
66 changes: 25 additions & 41 deletions cmd/kafka-consumer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"testing"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/golang/mock/gomock"
"github.com/pingcap/ticdc/cmd/util"
"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/downstreamadapter/sink/eventrouter"
sinkmock "github.com/pingcap/ticdc/downstreamadapter/sink/mock"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
Expand All @@ -30,40 +31,23 @@ import (
"github.com/stretchr/testify/require"
)

// recordingSink is a minimal sink.Sink implementation that records which DDLs are executed.
//
// It lets unit tests validate consumer-side DDL flushing behavior without requiring a real downstream.
type recordingSink struct {
ddls []string
}
func newMockSink(t *testing.T) (*sinkmock.MockSink, *[]string) {
t.Helper()

var _ sink.Sink = (*recordingSink)(nil)

func (s *recordingSink) SinkType() common.SinkType { return common.MysqlSinkType }
func (s *recordingSink) IsNormal() bool { return true }
func (s *recordingSink) AddDMLEvent(_ *commonEvent.DMLEvent) {
}
ctrl := gomock.NewController(t)
s := sinkmock.NewMockSink(ctrl)
ddls := make([]string, 0)

func (s *recordingSink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error {
return nil
}

func (s *recordingSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
if ddl, ok := event.(*commonEvent.DDLEvent); ok {
s.ddls = append(s.ddls, ddl.Query)
}
return nil
}

func (s *recordingSink) AddCheckpointTs(_ uint64) {
}

func (s *recordingSink) SetTableSchemaStore(_ *commonEvent.TableSchemaStore) {
}
s.EXPECT().AddDMLEvent(gomock.Any()).AnyTimes()
s.EXPECT().WriteBlockEvent(gomock.Any()).DoAndReturn(func(event commonEvent.BlockEvent) error {
if ddl, ok := event.(*commonEvent.DDLEvent); ok {
ddls = append(ddls, ddl.Query)
}
return nil
}).AnyTimes()

func (s *recordingSink) Close(_ bool) {
return s, &ddls
}
func (s *recordingSink) Run(_ context.Context) error { return nil }

func TestWriterWrite_executesIndependentCreateTableWithoutWatermark(t *testing.T) {
// Scenario: In some integration tests the upstream intentionally pauses dispatcher creation, which can
Expand All @@ -75,7 +59,7 @@ func TestWriterWrite_executesIndependentCreateTableWithoutWatermark(t *testing.T
// 2) Call writer.Write and expect the DDL is executed to advance downstream schema even without the
// watermark catching up.
ctx := context.Background()
s := &recordingSink{}
s, ddls := newMockSink(t)
w := &writer{
progresses: []*partitionProgress{
{partition: 0, watermark: 0},
Expand All @@ -100,7 +84,7 @@ func TestWriterWrite_executesIndependentCreateTableWithoutWatermark(t *testing.T

w.Write(ctx, codecCommon.MessageTypeDDL)

require.Equal(t, []string{"CREATE TABLE `test`.`t` (`id` INT PRIMARY KEY)"}, s.ddls)
require.Equal(t, []string{"CREATE TABLE `test`.`t` (`id` INT PRIMARY KEY)"}, *ddls)
require.Empty(t, w.ddlList)
}

Expand All @@ -113,7 +97,7 @@ func TestWriterWrite_preservesOrderWhenBlockedDDLNotReady(t *testing.T) {
// 2) Call writer.Write and expect nothing executes.
// 3) Advance watermark beyond the first DDL and expect both execute in order.
ctx := context.Background()
s := &recordingSink{}
s, ddls := newMockSink(t)
p := &partitionProgress{partition: 0, watermark: 0}
w := &writer{
progresses: []*partitionProgress{p},
Expand Down Expand Up @@ -145,15 +129,15 @@ func TestWriterWrite_preservesOrderWhenBlockedDDLNotReady(t *testing.T) {
}

w.Write(ctx, codecCommon.MessageTypeDDL)
require.Empty(t, s.ddls)
require.Empty(t, *ddls)
require.Len(t, w.ddlList, 2)

p.watermark = 200
w.Write(ctx, codecCommon.MessageTypeDDL)
require.Equal(t, []string{
"ALTER TABLE `test`.`t` ADD COLUMN `c2` INT",
"CREATE TABLE `test`.`t2` (`id` INT PRIMARY KEY)",
}, s.ddls)
}, *ddls)
require.Empty(t, w.ddlList)
}

Expand All @@ -166,7 +150,7 @@ func TestWriterWrite_doesNotBypassWatermarkForCreateTableLike(t *testing.T) {
// 2) Call writer.Write and expect the DDL is NOT executed.
// 3) Advance watermark beyond the DDL commitTs and expect the DDL executes.
ctx := context.Background()
s := &recordingSink{}
s, ddls := newMockSink(t)
p := &partitionProgress{partition: 0, watermark: 0}
w := &writer{
progresses: []*partitionProgress{p},
Expand All @@ -191,12 +175,12 @@ func TestWriterWrite_doesNotBypassWatermarkForCreateTableLike(t *testing.T) {
}

w.Write(ctx, codecCommon.MessageTypeDDL)
require.Empty(t, s.ddls)
require.Empty(t, *ddls)
require.Len(t, w.ddlList, 1)

p.watermark = 200
w.Write(ctx, codecCommon.MessageTypeDDL)
require.Equal(t, []string{"CREATE TABLE `test`.`t2` LIKE `test`.`t1`"}, s.ddls)
require.Equal(t, []string{"CREATE TABLE `test`.`t2` LIKE `test`.`t1`"}, *ddls)
require.Empty(t, w.ddlList)
}

Expand All @@ -210,7 +194,7 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) {
// 2) Call writer.Write and expect all DDLs with commitTs <= watermark execute (in commit-ts order),
// and only the truly "future" DDL remains pending.
ctx := context.Background()
s := &recordingSink{}
s, ddls := newMockSink(t)
p := &partitionProgress{partition: 0, watermark: 944040962}
w := &writer{
progresses: []*partitionProgress{p},
Expand Down Expand Up @@ -279,7 +263,7 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) {
"ALTER TABLE `common_1`.`add_and_drop_columns` ADD COLUMN `col1` INT NULL, ADD COLUMN `col2` INT NULL, ADD COLUMN `col3` INT NULL",
"ALTER TABLE `common_1`.`add_and_drop_columns` DROP COLUMN `col1`, DROP COLUMN `col2`",
"CREATE DATABASE `common`",
}, s.ddls)
}, *ddls)
require.Len(t, w.ddlList, 1)
require.Equal(t, "CREATE TABLE `common_1`.`a` (`a` BIGINT PRIMARY KEY,`b` INT)", w.ddlList[0].Query)
}
Expand Down
Loading
Loading