Skip to content

Commit 505aa93

Browse files
hongyunyantenfyzhong
authored andcommitted
mysqlSink: Support Batch DMLs for table without pk but have uk (#3295)
ref #1549
1 parent 13b1128 commit 505aa93

File tree

7 files changed

+164
-17
lines changed

7 files changed

+164
-17
lines changed

pkg/common/table_info.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,22 @@ func (ti *TableInfo) IsHandleKey(colID int64) bool {
512512
return ok
513513
}
514514

515+
// GetOrderedHandleKeyColumnIDs returns the ordered handle-key column IDs.
516+
// It prefers the primary key columns (PKIndex) when available; otherwise,
517+
// it falls back to the not-null unique key selected during schema init.
518+
func (ti *TableInfo) GetOrderedHandleKeyColumnIDs() []int64 {
519+
if len(ti.columnSchema.PKIndex) > 0 {
520+
return ti.columnSchema.PKIndex
521+
}
522+
if len(ti.columnSchema.HandleColID) == 0 {
523+
return nil
524+
}
525+
if len(ti.columnSchema.HandleColID) == 1 && ti.columnSchema.HandleColID[0] == -1 {
526+
return nil
527+
}
528+
return ti.columnSchema.HandleColID
529+
}
530+
515531
func (ti *TableInfo) ToTiDBTableInfo() *model.TableInfo {
516532
return &model.TableInfo{
517533
ID: ti.TableName.TableID,

pkg/common/table_info_helper.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -363,16 +363,16 @@ type columnSchema struct {
363363
// create table t (a int primary key, b int unique key);
364364
// Every element in first dimension is a index, and the second dimension is the columns offset
365365
IndexColumns [][]int64 `json:"index_columns"`
366-
367366
// PKIndex store the colID of the columns in row changed events for primary key
368367
PKIndex []int64 `json:"pk_index"`
369-
370368
// The following 3 fields, should only be used to decode datum from the raw value bytes, do not abuse those field.
371369
// RowColInfos extend the model.ColumnInfo with some extra information
372370
// it's the same length and order with the model.TableInfo.Columns
373371
RowColInfos []rowcodec.ColInfo `json:"row_col_infos"`
374372
RowColFieldTps map[int64]*datumTypes.FieldType `json:"row_col_field_tps"`
375-
// only for new row format decoder
373+
// if the table has pk, the handle col id is the pk col id
374+
// else if the table has not null unique key, the handle col id is the unique key col id
375+
// else the handle col id is -1
376376
HandleColID []int64 `json:"handle_col_id"`
377377
// RowColFieldTpsSlice is used to decode chunk ∂ raw value bytes
378378
RowColFieldTpsSlice []*datumTypes.FieldType `json:"row_col_field_tps_slice"`
@@ -640,13 +640,18 @@ func (s *columnSchema) initIndexColumns() {
640640
}
641641
}
642642
}
643+
643644
if handleIndexOffset < 0 {
644645
return
645646
}
646647

648+
// set handle key with not null unique key
647649
selectCols := s.Indices[handleIndexOffset].Columns
650+
s.HandleColID = make([]int64, 0, len(selectCols))
648651
for _, col := range selectCols {
649-
s.HandleKeyIDs[s.Columns[col.Offset].ID] = struct{}{}
652+
colID := s.Columns[col.Offset].ID
653+
s.HandleKeyIDs[colID] = struct{}{}
654+
s.HandleColID = append(s.HandleColID, colID)
650655
}
651656
}
652657

pkg/common/table_info_helper_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,65 @@ func TestColumnIndex(t *testing.T) {
174174
require.Equal(t, tableInfo.GetPKIndex(), []int64{101})
175175
}
176176

177+
func TestGetOrderedHandleKeyColumnIDs(t *testing.T) {
178+
t.Run("pk is handle", func(t *testing.T) {
179+
columns := []*model.ColumnInfo{
180+
newColumnInfo(101, "a", mysql.TypeLong, mysql.PriKeyFlag),
181+
newColumnInfo(102, "b", mysql.TypeLong, 0),
182+
}
183+
tableInfo := WrapTableInfo("test", &model.TableInfo{
184+
PKIsHandle: true,
185+
Columns: columns,
186+
})
187+
require.Equal(t, []int64{101}, tableInfo.GetOrderedHandleKeyColumnIDs())
188+
})
189+
190+
t.Run("composite primary key", func(t *testing.T) {
191+
columns := []*model.ColumnInfo{
192+
newColumnInfo(201, "a", mysql.TypeLong, mysql.PriKeyFlag),
193+
newColumnInfo(202, "b", mysql.TypeLong, mysql.PriKeyFlag),
194+
}
195+
tableInfo := WrapTableInfo("test", &model.TableInfo{
196+
Columns: columns,
197+
Indices: []*model.IndexInfo{
198+
newIndexInfo("pk", []*model.IndexColumn{{Offset: 0}, {Offset: 1}}, true, true),
199+
},
200+
})
201+
require.Equal(t, []int64{201, 202}, tableInfo.GetOrderedHandleKeyColumnIDs())
202+
})
203+
204+
t.Run("unique key fallback", func(t *testing.T) {
205+
columns := []*model.ColumnInfo{
206+
newColumnInfo(301, "a", mysql.TypeLong, 0),
207+
newColumnInfo(302, "b", mysql.TypeLong, mysql.NotNullFlag),
208+
newColumnInfo(303, "c", mysql.TypeLong, mysql.NotNullFlag),
209+
newColumnInfo(304, "d", mysql.TypeLong, mysql.NotNullFlag),
210+
newColumnInfo(305, "e", mysql.TypeLong, 0),
211+
}
212+
tableInfo := WrapTableInfo("test", &model.TableInfo{
213+
Columns: columns,
214+
Indices: []*model.IndexInfo{
215+
newIndexInfo("uk_long", []*model.IndexColumn{{Offset: 1}, {Offset: 2}}, false, true),
216+
newIndexInfo("uk_short", []*model.IndexColumn{{Offset: 3}}, false, true),
217+
newIndexInfo("uk_nullable", []*model.IndexColumn{{Offset: 4}}, false, true),
218+
},
219+
})
220+
require.True(t, tableInfo.IsHandleKey(304))
221+
require.Equal(t, []int64{304}, tableInfo.GetOrderedHandleKeyColumnIDs())
222+
})
223+
224+
t.Run("no handle key", func(t *testing.T) {
225+
columns := []*model.ColumnInfo{
226+
newColumnInfo(401, "a", mysql.TypeLong, 0),
227+
newColumnInfo(402, "b", mysql.TypeLong, 0),
228+
}
229+
tableInfo := WrapTableInfo("test", &model.TableInfo{
230+
Columns: columns,
231+
})
232+
require.Nil(t, tableInfo.GetOrderedHandleKeyColumnIDs())
233+
})
234+
}
235+
177236
func TestIndexByName(t *testing.T) {
178237
tableInfo := WrapTableInfo("test", &model.TableInfo{
179238
Indices: nil,

pkg/sink/mysql/mysql_writer_dml.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err
108108
for _, sortedEventGroups := range eventsGroupSortedByUpdateTs {
109109
for _, eventsInGroup := range sortedEventGroups {
110110
tableInfo := eventsInGroup[0].TableInfo
111-
if !w.shouldGenBatchSQL(tableInfo.HasPrimaryKey(), tableInfo.HasVirtualColumns(), eventsInGroup) {
111+
if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) {
112112
queryList, argsList = w.generateNormalSQLs(eventsInGroup)
113113
} else {
114114
queryList, argsList = w.generateBatchSQL(eventsInGroup)
@@ -128,16 +128,16 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err
128128
// shouldGenBatchSQL determines whether batch SQL generation should be used based on table properties and events.
129129
// Batch SQL generation is used when:
130130
// 1. BatchDMLEnable = true, and rows > 1
131-
// 2. The table has a primary key
131+
// 2. The table has a pk or not null unique key
132132
// 3. The table doesn't have virtual columns
133133
// 4. There's more than one row in the group
134134
// 5. All events have the same safe mode status
135-
func (w *Writer) shouldGenBatchSQL(hasPK bool, hasVirtualCols bool, events []*commonEvent.DMLEvent) bool {
135+
func (w *Writer) shouldGenBatchSQL(hasPKOrNotNullUK bool, hasVirtualCols bool, events []*commonEvent.DMLEvent) bool {
136136
if !w.cfg.BatchDMLEnable {
137137
return false
138138
}
139139

140-
if !hasPK || hasVirtualCols {
140+
if !hasPKOrNotNullUK || hasVirtualCols {
141141
return false
142142
}
143143
if len(events) == 1 && events[0].Len() == 1 {
@@ -266,6 +266,23 @@ func (w *Writer) generateSQLForSingleEvent(event *commonEvent.DMLEvent, inDataSa
266266
return w.batchSingleTxnDmls(rowLists, tableInfo, inDataSafeMode)
267267
}
268268

269+
func (w *Writer) generateBatchSQLsPerEvent(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) {
270+
var (
271+
queries []string
272+
args [][]interface{}
273+
)
274+
for _, event := range events {
275+
if event.Len() == 0 {
276+
continue
277+
}
278+
inSafeMode := w.cfg.SafeMode || w.isInErrorCausedSafeMode || event.CommitTs < event.ReplicatingTs
279+
sqls, vals := w.generateSQLForSingleEvent(event, inSafeMode)
280+
queries = append(queries, sqls...)
281+
args = append(args, vals...)
282+
}
283+
return queries, args
284+
}
285+
269286
func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) {
270287
tableInfo := events[0].TableInfo
271288
type RowChangeWithKeys struct {
@@ -329,7 +346,7 @@ func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([
329346
rowKey := rowLists[i].RowKeys
330347
if nextRowType == common.RowTypeInsert {
331348
if compareKeys(rowKey, rowLists[j].RowKeys) {
332-
sql, values := w.generateNormalSQLs(events)
349+
sql, values := w.generateBatchSQLsPerEvent(events)
333350
log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Int("writerID", w.id))
334351
log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id))
335352
}
@@ -368,7 +385,7 @@ func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([
368385
}
369386
if nextRowType == common.RowTypeInsert {
370387
if compareKeys(rowKey, rowLists[j].RowKeys) {
371-
sql, values := w.generateNormalSQLs(events)
388+
sql, values := w.generateBatchSQLsPerEvent(events)
372389
log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Int("writerID", w.id))
373390
log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id))
374391
}
@@ -454,8 +471,8 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s
454471
if !compareKeys(hashToKeyMap[hashValue], keyValue) {
455472
log.Warn("the key hash is equal, but the keys is not the same; so we don't use batch generate sql, but use the normal generated sql instead")
456473
event.Rewind() // reset event
457-
// use normal sql instead
458-
sql, args := w.generateNormalSQLs(events)
474+
// fallback to per-event batch sql
475+
sql, args := w.generateBatchSQLsPerEvent(events)
459476
return sql, args, false
460477
}
461478
}
@@ -518,7 +535,7 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s
518535
for i := 1; i < len(rowChanges); i++ {
519536
rowType := rowChanges[i].RowType
520537
if rowType == prevType {
521-
sql, values := w.generateNormalSQLs(events)
538+
sql, values := w.generateBatchSQLsPerEvent(events)
522539
log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Int("writerID", w.id))
523540
log.Panic("invalid row changes", zap.String("schemaName", tableInfo.GetSchemaName()),
524541
zap.String("tableName", tableInfo.GetTableName()), zap.Any("rowChanges", rowChanges),
@@ -884,7 +901,7 @@ func (w *Writer) groupRowsByType(
884901
eventTableInfo,
885902
nil, nil)
886903
updateRow = append(updateRow, newUpdateRow)
887-
if len(updateRow) >= w.cfg.MaxTxnRow {
904+
if len(updateRow) >= w.cfg.MaxMultiUpdateRowCount {
888905
updateRows = append(updateRows, updateRow)
889906
updateRow = make([]*sqlmodel.RowChange, 0, rowSize)
890907
}

pkg/sink/mysql/mysql_writer_dml_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,3 +876,49 @@ func TestGroupEventsByTable(t *testing.T) {
876876
require.Equal(t, 2, len(tableGroups[1]), "Second group should have 2 events")
877877
})
878878
}
879+
880+
func TestPrepareDMLsWithNotNullUniqueKey(t *testing.T) {
881+
writer, _, _ := newTestMysqlWriter(t)
882+
defer writer.db.Close()
883+
helper := commonEvent.NewEventTestHelper(t)
884+
defer helper.Close()
885+
886+
helper.Tk().MustExec("use test")
887+
createTableSQL := "create table t3 (a int not null unique key, b int);"
888+
job := helper.DDL2Job(createTableSQL)
889+
require.NotNil(t, job)
890+
891+
// enable batch DML and set limits
892+
writer.cfg.BatchDMLEnable = true
893+
writer.cfg.SafeMode = false
894+
writer.cfg.MaxTxnRow = 10
895+
896+
// prepare a few insert events for table t3
897+
dml1 := helper.DML2Event("test", "t3", "insert into t3 values (1, 10)")
898+
helper.ExecuteDeleteDml("test", "t3", "delete from t3 where a = 1")
899+
dml2 := helper.DML2Event("test", "t3", "insert into t3 values (2, 20)")
900+
helper.ExecuteDeleteDml("test", "t3", "delete from t3 where a = 2")
901+
dml3 := helper.DML2Event("test", "t3", "insert into t3 values (3, 30)")
902+
helper.ExecuteDeleteDml("test", "t3", "delete from t3 where a = 3")
903+
904+
events := []*commonEvent.DMLEvent{dml1, dml2, dml3}
905+
906+
dmls, err := writer.prepareDMLs(events)
907+
require.NoError(t, err)
908+
require.NotNil(t, dmls)
909+
910+
// Expect at least one batched INSERT for t3
911+
found := false
912+
for i, q := range dmls.sqls {
913+
if strings.HasPrefix(q, "INSERT INTO `test`.`t3`") || strings.HasPrefix(q, "REPLACE INTO `test`.`t3`") {
914+
found = true
915+
// check corresponding args length: 3 rows * 2 cols = 6
916+
require.Equal(t, 6, len(dmls.values[i]))
917+
// check placeholders count consistent with args
918+
placeholderCount := strings.Count(q, "?")
919+
require.Equal(t, 6, placeholderCount)
920+
break
921+
}
922+
}
923+
require.True(t, found, "expected batched INSERT/REPLACE for table t3")
924+
}

pkg/sink/mysql/mysql_writer_for_ddl_ts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (w *Writer) SendDDLTsPre(event commonEvent.BlockEvent) error {
9797
isSyncpoint = "0"
9898
}
9999
query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "0", isSyncpoint)
100-
log.Info("send ddl ts table query", zap.String("query", query))
100+
log.Debug("send ddl ts table query", zap.String("query", query))
101101

102102
_, err = tx.Exec(query)
103103
if err != nil {

pkg/sink/mysql/mysql_writer_helper.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func compareKeys(firstKey, secondKey []byte) bool {
3232
func genKeyAndHash(row *chunk.Row, tableInfo *common.TableInfo) (uint64, []byte) {
3333
key := genKeyList(row, tableInfo)
3434
if len(key) == 0 {
35-
log.Panic("the table has no primary key", zap.Any("tableInfo", tableInfo))
35+
log.Panic("the table has no primary key or not-null unique key", zap.Any("tableInfo", tableInfo))
3636
}
3737

3838
hasher := fnv.New32a()
@@ -45,7 +45,11 @@ func genKeyAndHash(row *chunk.Row, tableInfo *common.TableInfo) (uint64, []byte)
4545

4646
func genKeyList(row *chunk.Row, tableInfo *common.TableInfo) []byte {
4747
var key []byte
48-
for _, colID := range tableInfo.GetPKIndex() {
48+
keyColumns := tableInfo.GetOrderedHandleKeyColumnIDs()
49+
if len(keyColumns) == 0 {
50+
return nil
51+
}
52+
for _, colID := range keyColumns {
4953
info, ok := tableInfo.GetColumnInfo(colID)
5054
if !ok || info == nil {
5155
return nil

0 commit comments

Comments
 (0)