Skip to content

Commit 5dcf247

Browse files
[release-20.0] Fix: Ensure Consistent Lookup Vindex Handles Duplicate Rows in Single Query (vitessio#17974) (vitessio#18077)
Signed-off-by: Harshit Gangal <[email protected]> Co-authored-by: Harshit Gangal <[email protected]>
1 parent 6cb8b18 commit 5dcf247

File tree

8 files changed

+216
-29
lines changed

8 files changed

+216
-29
lines changed

go/test/endtoend/vtgate/lookup_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func TestConsistentLookup(t *testing.T) {
131131
mysqlErr := err.(*sqlerror.SQLError)
132132
assert.Equal(t, sqlerror.ERDupEntry, mysqlErr.Num)
133133
assert.Equal(t, "23000", mysqlErr.State)
134-
assert.ErrorContains(t, mysqlErr, "reverted partial DML execution")
134+
assert.ErrorContains(t, mysqlErr, "lookup.Create: target: ks.80-.primary: vttablet: (errno 1062) (sqlstate 23000)")
135135

136136
// Simple delete.
137137
utils.Exec(t, conn, "begin")

go/test/endtoend/vtgate/queries/dml/dml_test.go

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,92 @@ import (
2727
"github.com/stretchr/testify/require"
2828
)
2929

30+
// TestUniqueLookupDuplicateEntries should fail if the is duplicate in unique lookup column.
31+
func TestUniqueLookupDuplicateEntries(t *testing.T) {
32+
mcmp, closer := start(t)
33+
defer closer()
34+
35+
// initial row
36+
utils.Exec(t, mcmp.VtConn, "insert into s_tbl(id, num) values (1,10)")
37+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`)
38+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`)
39+
40+
// insert duplicate row
41+
utils.AssertContainsError(t, mcmp.VtConn, "insert into s_tbl(id, num) values (2,10)", "lookup.Create: target: sks.-80.primary: vttablet: "+
42+
"Duplicate entry '10' for key 'num_vdx_tbl.PRIMARY'")
43+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`)
44+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`)
45+
46+
// insert duplicate row in multi-row insert multi shard
47+
utils.AssertContainsError(t, mcmp.VtConn, "insert into s_tbl(id, num) values (3,20), (4,20),(5,30)",
48+
"transaction rolled back to reverse changes of partial DML execution: target: sks.80-.primary: vttablet: "+
49+
"Duplicate entry '20' for key 'num_vdx_tbl.PRIMARY'")
50+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`)
51+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`)
52+
53+
// insert duplicate row in multi-row insert - lookup single shard
54+
utils.AssertContainsError(t, mcmp.VtConn, "insert into s_tbl(id, num) values (3,20), (4,20)",
55+
"transaction rolled back to reverse changes of partial DML execution: lookup.Create: target: sks.80-.primary: vttablet: "+
56+
"Duplicate entry '20' for key 'num_vdx_tbl.PRIMARY'")
57+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`)
58+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`)
59+
60+
// insert second row to test with limit update.
61+
utils.Exec(t, mcmp.VtConn, "insert into s_tbl(id, num) values (10,100)")
62+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)] [INT64(10) INT64(100)]]`)
63+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")] [INT64(100) VARCHAR("594764E1A2B2D98E")]]`)
64+
65+
// update with limit 1 succeed.
66+
utils.Exec(t, mcmp.VtConn, "update s_tbl set num = 30 order by id limit 1")
67+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(30)] [INT64(10) INT64(100)]]`)
68+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(30) VARCHAR("166B40B44ABA4BD6")] [INT64(100) VARCHAR("594764E1A2B2D98E")]]`)
69+
70+
// update to same value on multiple row should fail.
71+
utils.AssertContainsError(t, mcmp.VtConn, "update s_tbl set num = 40 limit 2",
72+
"lookup.Create: transaction rolled back to reverse changes of partial DML execution: target: sks.80-.primary: vttablet: "+
73+
"rpc error: code = AlreadyExists desc = Duplicate entry '40' for key 'num_vdx_tbl.PRIMARY'")
74+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(30)] [INT64(10) INT64(100)]]`)
75+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(30) VARCHAR("166B40B44ABA4BD6")] [INT64(100) VARCHAR("594764E1A2B2D98E")]]`)
76+
}
77+
78+
// TestUniqueLookupDuplicateIgnore tests the insert ignore on lookup table.
79+
func TestUniqueLookupDuplicateIgnore(t *testing.T) {
80+
mcmp, closer := start(t)
81+
defer closer()
82+
83+
// initial row
84+
utils.Exec(t, mcmp.VtConn, "insert into s_tbl(id, num) values (1,10)")
85+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`)
86+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`)
87+
88+
// insert ignore duplicate row
89+
qr := utils.Exec(t, mcmp.VtConn, "insert ignore into s_tbl(id, num) values (2,10)")
90+
assert.EqualValues(t, 0, qr.RowsAffected)
91+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`)
92+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")]]`)
93+
94+
// insert duplicate row in multi-row insert - lookup single shard
95+
// Current behavior does not work as expected—one of the rows should be inserted.
96+
// The lookup table is updated, but the main table is not. This is a bug in Vitess.
97+
// The issue occurs because the table has two vindex columns (`num` and `col`), both of which ignore nulls during vindex insertion.
98+
// In the `INSERT IGNORE` case, after the vindex create API call, a verify call checks if the row exists in the lookup table.
99+
// - If the row exists, it is inserted into the main table.
100+
// - If the row does not exist, the main table insertion is skipped.
101+
// Since the `col` column is null, the row is not inserted into the lookup table, causing the main table insertion to be ignored.
102+
qr = utils.Exec(t, mcmp.VtConn, "insert ignore into s_tbl(id, num) values (3,20), (4,20)")
103+
assert.EqualValues(t, 0, qr.RowsAffected)
104+
utils.AssertMatches(t, mcmp.VtConn, "select id, num from s_tbl order by id", `[[INT64(1) INT64(10)]]`)
105+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")] [INT64(20) VARCHAR("4EB190C9A2FA169C")]]`)
106+
107+
// insert duplicate row in multi-row insert - vindex values are not null
108+
qr = utils.Exec(t, mcmp.VtConn, "insert ignore into s_tbl(id, num, col) values (3,20, 30), (4,20, 40)")
109+
assert.EqualValues(t, 1, qr.RowsAffected)
110+
utils.AssertMatches(t, mcmp.VtConn, "select id, num, col from s_tbl order by id", `[[INT64(1) INT64(10) NULL] [INT64(3) INT64(20) INT64(30)]]`)
111+
utils.AssertMatches(t, mcmp.VtConn, "select num, hex(keyspace_id) from num_vdx_tbl order by num", `[[INT64(10) VARCHAR("166B40B44ABA4BD6")] [INT64(20) VARCHAR("4EB190C9A2FA169C")]]`)
112+
utils.AssertMatches(t, mcmp.VtConn, "select col, hex(keyspace_id) from col_vdx_tbl order by col", `[[INT64(30) VARCHAR("4EB190C9A2FA169C")]]`)
113+
114+
}
115+
30116
func TestMultiEqual(t *testing.T) {
31117
if clusterInstance.HasPartialKeyspaces {
32118
t.Skip("test uses multiple keyspaces, test framework only supports partial keyspace testing for a single keyspace")
@@ -80,7 +166,7 @@ func TestDeleteWithLimit(t *testing.T) {
80166
defer closer()
81167

82168
// initial rows
83-
mcmp.Exec("insert into s_tbl(id, col) values (1,10), (2,10), (3,10), (4,20), (5,5), (6,15), (7,17), (8,80)")
169+
mcmp.Exec("insert into s_tbl(id, col) values (1,10), (4,20), (5,5), (6,15), (7,17), (8,80)")
84170
mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)")
85171

86172
// delete with limit
@@ -92,25 +178,23 @@ func TestDeleteWithLimit(t *testing.T) {
92178

93179
// check rows
94180
mcmp.AssertMatches(`select id, col from s_tbl order by id`,
95-
`[[INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`)
181+
`[[INT64(4) INT64(20)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`)
96182
// 2 rows matches but limit is 1, so any one of the row can remain in table.
97183
mcmp.AssertMatchesAnyNoCompare(`select region_id, oid, cust_no from order_tbl order by oid`,
98184
`[[INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`,
99185
`[[INT64(1) INT64(1) INT64(4)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)
100186

101187
// delete with limit
102-
qr = mcmp.Exec(`delete from s_tbl where col < 20 limit 2`)
188+
qr = mcmp.Exec(`delete from s_tbl where col < 25 limit 2`)
103189
require.EqualValues(t, 2, qr.RowsAffected)
104190

105191
qr = mcmp.Exec(`delete from order_tbl limit 5`)
106192
require.EqualValues(t, 3, qr.RowsAffected)
107193

108194
// check rows
109195
// 3 rows matches `col < 20` but limit is 2 so any one of them can remain in the table.
110-
mcmp.AssertMatchesAnyNoCompare(`select id, col from s_tbl order by id`,
111-
`[[INT64(4) INT64(20)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`,
112-
`[[INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(8) INT64(80)]]`,
113-
`[[INT64(4) INT64(20)] [INT64(6) INT64(15)] [INT64(8) INT64(80)]]`)
196+
mcmp.AssertMatches(`select id, col from s_tbl order by id`,
197+
`[[INT64(8) INT64(80)]]`)
114198
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
115199
`[]`)
116200

go/vt/vterrors/constants.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,29 @@ const (
2727
// RxOp regex for operation not allowed error
2828
var RxOp = regexp.MustCompile("operation not allowed in state (NOT_SERVING|SHUTTING_DOWN)")
2929

30-
// TxEngineClosed for transaction engine closed error
31-
const TxEngineClosed = "tx engine can't accept new connections in state %v"
32-
33-
// WrongTablet for invalid tablet type error
34-
const WrongTablet = "wrong tablet type"
35-
36-
// RxWrongTablet regex for invalid tablet type error
37-
var RxWrongTablet = regexp.MustCompile("(wrong|invalid) tablet type")
38-
3930
// Constants for error messages
4031
const (
32+
// TxEngineClosed for transaction engine closed error
33+
TxEngineClosed = "tx engine can't accept new connections in state %v"
34+
4135
// PrimaryVindexNotSet is the error message to be used when there is no primary vindex found on a table
4236
PrimaryVindexNotSet = "table '%s' does not have a primary vindex"
37+
38+
// WrongTablet for invalid tablet type error
39+
WrongTablet = "wrong tablet type"
40+
41+
// TxKillerRollback purpose when acquire lock on connection for rolling back transaction.
42+
TxKillerRollback = "in use: for tx killer rollback"
43+
44+
// RevertedPartialExec is the error message to be used when a partial DML execution failure is reverted using savepoint.
45+
RevertedPartialExec = "reverted partial DML execution failure"
46+
47+
// TxRollbackOnPartialExec is the error message to be used when a transaction is rolled back to reverse changes of partial DML execution
48+
TxRollbackOnPartialExec = "transaction rolled back to reverse changes of partial DML execution"
4349
)
4450

45-
// TxKillerRollback purpose when acquire lock on connection for rolling back transaction.
46-
const TxKillerRollback = "in use: for tx killer rollback"
51+
// RxWrongTablet regex for invalid tablet type error
52+
var RxWrongTablet = regexp.MustCompile("(wrong|invalid) tablet type")
4753

4854
// TxClosed regex for connection closed
4955
var TxClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found|in use: for tx killer rollback)")

go/vt/vtgate/executor_dml_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import (
2929
"vitess.io/vitess/go/mysql/sqlerror"
3030
"vitess.io/vitess/go/sqltypes"
3131
"vitess.io/vitess/go/test/utils"
32+
"vitess.io/vitess/go/vt/discovery"
3233
querypb "vitess.io/vitess/go/vt/proto/query"
34+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3335
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
3436
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
3537
"vitess.io/vitess/go/vt/sqlparser"
@@ -3140,3 +3142,66 @@ func TestDeleteMultiTable(t *testing.T) {
31403142
// delete from `user` where (`user`.id) in ::dml_vals - 1 shard
31413143
testQueryLog(t, executor, logChan, "TestExecute", "DELETE", "delete `user` from `user` join music on `user`.col = music.col where music.user_id = 1", 18)
31423144
}
3145+
3146+
func TestConsistentLookupInsert(t *testing.T) {
3147+
ctx := utils.LeakCheckContext(t)
3148+
3149+
// Special setup
3150+
cell := "zone1"
3151+
ks := "TestExecutor"
3152+
hc := discovery.NewFakeHealthCheck(nil)
3153+
s := createSandbox(ks)
3154+
s.ShardSpec = "-80-"
3155+
s.VSchema = executorVSchema
3156+
serv := newSandboxForCells(ctx, []string{cell})
3157+
resolver := newTestResolver(ctx, hc, serv, cell)
3158+
sbc1 := hc.AddTestTablet(cell, "-80", 1, ks, "-80", topodatapb.TabletType_PRIMARY, true, 1, nil)
3159+
sbc2 := hc.AddTestTablet(cell, "80-", 1, ks, "80-", topodatapb.TabletType_PRIMARY, true, 1, nil)
3160+
3161+
executor := createExecutor(ctx, serv, cell, resolver)
3162+
defer executor.Close()
3163+
3164+
logChan := executor.queryLogger.Subscribe("Test")
3165+
defer executor.queryLogger.Unsubscribe(logChan)
3166+
3167+
session := NewAutocommitSession(&vtgatepb.Session{})
3168+
3169+
t.Run("transaction rollback due to partial execution error, no duplicate handling", func(t *testing.T) {
3170+
sbc1.EphemeralShardErr = sqlerror.NewSQLError(sqlerror.ERDupEntry, sqlerror.SSConstraintViolation, "Duplicate entry '10' for key 't1_lkp_idx.PRIMARY'")
3171+
sbc2.SetResults([]*sqltypes.Result{{RowsAffected: 1}})
3172+
_, err := executorExecSession(ctx, executor, "insert into t1(id, unq_col) values (1, 10), (4, 10), (50, 4)", nil, session.Session)
3173+
assert.ErrorContains(t, err,
3174+
"lookup.Create: transaction rolled back to reverse changes of partial DML execution: target: TestExecutor.-80.primary: "+
3175+
"Duplicate entry '10' for key 't1_lkp_idx.PRIMARY' (errno 1062) (sqlstate 23000)")
3176+
3177+
assert.EqualValues(t, 0, sbc1.ExecCount.Load())
3178+
assert.EqualValues(t, 1, sbc2.ExecCount.Load())
3179+
3180+
testQueryLog(t, executor, logChan, "VindexCreate", "INSERT", "insert into t1_lkp_idx(unq_col, keyspace_id) values (:unq_col_0, :keyspace_id_0), (:unq_col_1, :keyspace_id_1), (:unq_col_2, :keyspace_id_2)", 2)
3181+
testQueryLog(t, executor, logChan, "TestExecute", "INSERT", "insert into t1(id, unq_col) values (1, 10), (4, 10), (50, 4)", 0)
3182+
})
3183+
3184+
sbc1.ExecCount.Store(0)
3185+
sbc2.ExecCount.Store(0)
3186+
session = NewAutocommitSession(session.Session)
3187+
3188+
t.Run("duplicate handling failing on same unique column value", func(t *testing.T) {
3189+
sbc1.EphemeralShardErr = sqlerror.NewSQLError(sqlerror.ERDupEntry, sqlerror.SSConstraintViolation, "Duplicate entry '10' for key 't1_lkp_idx.PRIMARY'")
3190+
sbc1.SetResults([]*sqltypes.Result{
3191+
sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace_id", "varbinary")),
3192+
{RowsAffected: 1},
3193+
})
3194+
_, err := executorExecSession(ctx, executor, "insert into t1(id, unq_col) values (1, 10), (4, 10)", nil, session.Session)
3195+
assert.ErrorContains(t, err,
3196+
"transaction rolled back to reverse changes of partial DML execution: lookup.Create: target: TestExecutor.-80.primary: "+
3197+
"Duplicate entry '10' for key 't1_lkp_idx.PRIMARY' (errno 1062) (sqlstate 23000)")
3198+
3199+
assert.EqualValues(t, 2, sbc1.ExecCount.Load())
3200+
assert.EqualValues(t, 0, sbc2.ExecCount.Load())
3201+
3202+
testQueryLog(t, executor, logChan, "VindexCreate", "INSERT", "insert into t1_lkp_idx(unq_col, keyspace_id) values (:unq_col_0, :keyspace_id_0), (:unq_col_1, :keyspace_id_1)", 1)
3203+
testQueryLog(t, executor, logChan, "VindexCreate", "SELECT", "select keyspace_id from t1_lkp_idx where unq_col = :unq_col for update", 1)
3204+
testQueryLog(t, executor, logChan, "VindexCreate", "INSERT", "insert into t1_lkp_idx(unq_col, keyspace_id) values (:unq_col, :keyspace_id)", 1)
3205+
testQueryLog(t, executor, logChan, "TestExecute", "INSERT", "insert into t1(id, unq_col) values (1, 10), (4, 10)", 0)
3206+
})
3207+
}

go/vt/vtgate/plan_execute.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,15 +367,16 @@ func (e *Executor) rollbackPartialExec(ctx context.Context, safeSession *SafeSes
367367
_, _, err = e.execute(ctx, nil, safeSession, rQuery, bindVars, logStats)
368368
// If no error, the revert is successful with the savepoint. Notify the reason as error to the client.
369369
if err == nil {
370-
errMsg.WriteString("reverted partial DML execution failure")
370+
errMsg.WriteString(vterrors.RevertedPartialExec)
371371
return vterrors.New(vtrpcpb.Code_ABORTED, errMsg.String())
372372
}
373373
// not able to rollback changes of the failed query, so have to abort the complete transaction.
374374
}
375375

376376
// abort the transaction.
377377
_ = e.txConn.Rollback(ctx, safeSession)
378-
errMsg.WriteString("transaction rolled back to reverse changes of partial DML execution")
378+
379+
errMsg.WriteString(vterrors.TxRollbackOnPartialExec)
379380
if err != nil {
380381
return vterrors.Wrap(err, errMsg.String())
381382
}

go/vt/vtgate/vcursor_impl.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,9 @@ func (vc *vcursorImpl) Execute(ctx context.Context, method string, query string,
544544
}
545545

546546
qr, err := vc.executor.Execute(ctx, nil, method, session, vc.marginComments.Leading+query+vc.marginComments.Trailing, bindVars)
547-
vc.setRollbackOnPartialExecIfRequired(err != nil, rollbackOnError)
547+
// If there is no error, it indicates at least one successful execution,
548+
// meaning a rollback should be triggered if a failure occurs later.
549+
vc.setRollbackOnPartialExecIfRequired(err == nil, rollbackOnError)
548550

549551
return qr, err
550552
}

go/vt/vtgate/vindexes/consistent_lookup.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,17 @@ import (
2323
"fmt"
2424
"strings"
2525

26+
"github.com/cespare/xxhash/v2"
27+
2628
"vitess.io/vitess/go/mysql/sqlerror"
2729
"vitess.io/vitess/go/sqltypes"
2830
"vitess.io/vitess/go/vt/key"
29-
"vitess.io/vitess/go/vt/sqlparser"
30-
"vitess.io/vitess/go/vt/vterrors"
31-
"vitess.io/vitess/go/vt/vtgate/evalengine"
32-
3331
querypb "vitess.io/vitess/go/vt/proto/query"
3432
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3533
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
34+
"vitess.io/vitess/go/vt/sqlparser"
35+
"vitess.io/vitess/go/vt/vterrors"
36+
"vitess.io/vitess/go/vt/vtgate/evalengine"
3637
)
3738

3839
const (
@@ -342,25 +343,53 @@ func (lu *clCommon) Verify(ctx context.Context, vcursor VCursor, ids []sqltypes.
342343

343344
// Create reserves the id by inserting it into the vindex table.
344345
func (lu *clCommon) Create(ctx context.Context, vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
346+
// Attempt to insert values into the lookup vindex table.
345347
origErr := lu.lkp.createCustom(ctx, vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
346348
if origErr == nil {
347349
return nil
348350
}
351+
352+
// If the transaction is already rolled back. We should not handle the case for duplicate error.
353+
if strings.Contains(origErr.Error(), vterrors.TxRollbackOnPartialExec) {
354+
return origErr
355+
}
356+
349357
// Try and convert the error to a MySQL error
350358
sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(origErr).(*sqlerror.SQLError)
351-
// If it is a MySQL error and its code is of duplicate entry, then we would like to continue
352-
// Otherwise, we return the error
359+
360+
// If the error is NOT a duplicate entry error, return it immediately.
353361
if !(isSQLErr && sqlErr != nil && sqlErr.Number() == sqlerror.ERDupEntry) {
354362
return origErr
355363
}
364+
365+
// Map to track unique row hashes and their original index in `rowsColValues`.
366+
rowHashIndex := make(map[uint64]int, len(rowsColValues))
356367
for i, row := range rowsColValues {
368+
rowKey := hashKeyXXH(row)
369+
// If a row with the same hash exists, perform an explicit value check to avoid hash collisions.
370+
if idx, exists := rowHashIndex[rowKey]; exists && sqltypes.RowEqual(row, rowsColValues[idx]) {
371+
return origErr // Exact duplicate found, return the original error.
372+
}
373+
rowHashIndex[rowKey] = i
374+
375+
// Attempt to handle the duplicate entry.
357376
if err := lu.handleDup(ctx, vcursor, row, ksids[i], origErr); err != nil {
358377
return err
359378
}
360379
}
361380
return nil
362381
}
363382

383+
// hashKeyXXH generates a fast 64-bit hash for a row using xxHash.
384+
// This is optimized for performance and helps detect duplicates efficiently.
385+
func hashKeyXXH(row []sqltypes.Value) uint64 {
386+
h := xxhash.New()
387+
for _, col := range row {
388+
_, _ = h.Write([]byte(col.String())) // Ignoring error as xxHash Write never fails
389+
}
390+
return h.Sum64()
391+
}
392+
364393
func (lu *clCommon) handleDup(ctx context.Context, vcursor VCursor, values []sqltypes.Value, ksid []byte, dupError error) error {
365394
bindVars := make(map[string]*querypb.BindVariable, len(values))
366395
for colnum, val := range values {

go/vt/vtgate/vindexes/vschema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1394,7 +1394,7 @@ func ChooseVindexForType(typ querypb.Type) (string, error) {
13941394

13951395
// FindBestColVindex finds the best ColumnVindex for VReplication.
13961396
func FindBestColVindex(table *Table) (*ColumnVindex, error) {
1397-
if table.ColumnVindexes == nil || len(table.ColumnVindexes) == 0 {
1397+
if len(table.ColumnVindexes) == 0 {
13981398
return nil, vterrors.Errorf(
13991399
vtrpcpb.Code_INVALID_ARGUMENT,
14001400
"table %s has no vindex",

0 commit comments

Comments
 (0)