Skip to content
This repository was archived by the owner on Aug 21, 2023. It is now read-only.

Commit 38ef9a5

Browse files
authored
sql: fix pick up possible field (tidb#29399) (#399) (#402)
1 parent 1fd9ecb commit 38ef9a5

File tree

4 files changed

+211
-15
lines changed

4 files changed

+211
-15
lines changed

v4/export/dump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) {
756756
err = errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
757757
return
758758
}
759-
if _, ok := dataTypeNum[pkColTypes[0]]; !ok {
759+
if _, ok := dataTypeInt[pkColTypes[0]]; !ok {
760760
err = errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
761761
}
762762
return

v4/export/sql.go

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"database/sql"
99
"fmt"
1010
"io"
11+
"math"
1112
"net/url"
1213
"strconv"
1314
"strings"
@@ -432,6 +433,9 @@ func GetPrimaryKeyColumns(db *sql.Conn, database, table string) ([]string, error
432433
return cols, nil
433434
}
434435

436+
// getNumericIndex picks up indices according to the following priority:
437+
// primary key > unique key with the smallest count > key with the max cardinality
438+
// primary key with multi cols is before unique key with single col because we will sort result by primary keys
435439
func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) {
436440
database, table := meta.DatabaseName(), meta.TableName()
437441
colName2Type := string2Map(meta.ColumnNames(), meta.ColumnTypes())
@@ -440,22 +444,64 @@ func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) {
440444
if err != nil {
441445
return "", errors.Annotatef(err, "sql: %s", keyQuery)
442446
}
443-
results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "KEY_NAME", "COLUMN_NAME")
447+
results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "SEQ_IN_INDEX", "KEY_NAME", "COLUMN_NAME", "CARDINALITY")
444448
if err != nil {
445449
return "", errors.Annotatef(err, "sql: %s", keyQuery)
446450
}
447-
uniqueColumnName := ""
451+
type keyColumnPair struct {
452+
colName string
453+
count uint64
454+
}
455+
var (
456+
uniqueKeyMap = map[string]keyColumnPair{} // unique key name -> key column name, unique key columns count
457+
keyColumn string
458+
maxCardinality int64 = -1
459+
)
460+
448461
// check primary key first, then unique key
449462
for _, oneRow := range results {
450-
var ok bool
451-
if _, ok = dataTypeNum[colName2Type[oneRow[2]]]; ok && oneRow[1] == "PRIMARY" {
452-
return oneRow[2], nil
463+
nonUnique, seqInIndex, keyName, colName, cardinality := oneRow[0], oneRow[1], oneRow[2], oneRow[3], oneRow[4]
464+
// only try pick the first column, because the second column of pk/uk in where condition will trigger a full table scan
465+
if seqInIndex != "1" {
466+
if pair, ok := uniqueKeyMap[keyName]; ok {
467+
seqInIndexInt, err := strconv.ParseUint(seqInIndex, 10, 64)
468+
if err == nil && seqInIndexInt > pair.count {
469+
uniqueKeyMap[keyName] = keyColumnPair{pair.colName, seqInIndexInt}
470+
}
471+
}
472+
continue
453473
}
454-
if uniqueColumnName != "" && oneRow[0] == "0" && ok {
455-
uniqueColumnName = oneRow[2]
474+
_, numberColumn := dataTypeInt[colName2Type[colName]]
475+
if numberColumn {
476+
switch {
477+
case keyName == "PRIMARY":
478+
return colName, nil
479+
case nonUnique == "0":
480+
uniqueKeyMap[keyName] = keyColumnPair{colName, 1}
481+
// pick index column with max cardinality when there is no unique index
482+
case len(uniqueKeyMap) == 0:
483+
cardinalityInt, err := strconv.ParseInt(cardinality, 10, 64)
484+
if err == nil && cardinalityInt > maxCardinality {
485+
keyColumn = colName
486+
maxCardinality = cardinalityInt
487+
}
488+
}
489+
}
490+
}
491+
if len(uniqueKeyMap) > 0 {
492+
var (
493+
minCols uint64 = math.MaxUint64
494+
uniqueKeyColumn string
495+
)
496+
for _, pair := range uniqueKeyMap {
497+
if pair.count < minCols {
498+
uniqueKeyColumn = pair.colName
499+
minCols = pair.count
500+
}
456501
}
502+
return uniqueKeyColumn, nil
457503
}
458-
return uniqueColumnName, nil
504+
return keyColumn, nil
459505
}
460506

461507
// FlushTableWithReadLock flush tables with read lock
@@ -944,7 +990,7 @@ func pickupPossibleField(meta TableMeta, db *sql.Conn) (string, error) {
944990
if meta.HasImplicitRowID() {
945991
return "_tidb_rowid", nil
946992
}
947-
// try to use pk
993+
// try to use pk or uk
948994
fieldName, err := getNumericIndex(db, meta)
949995
if err != nil {
950996
return "", err

v4/export/sql_test.go

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ import (
1616
"strings"
1717
"testing"
1818

19-
tcontext "github.com/pingcap/dumpling/v4/context"
2019
"github.com/stretchr/testify/require"
2120

21+
tcontext "github.com/pingcap/dumpling/v4/context"
22+
2223
"github.com/DATA-DOG/go-sqlmock"
2324
"github.com/coreos/go-semver/semver"
2425
"github.com/pingcap/errors"
@@ -1540,6 +1541,150 @@ func TestBuildVersion3RegionQueries(t *testing.T) {
15401541
}
15411542
}
15421543

1544+
func TestPickupPossibleField(t *testing.T) {
1545+
db, mock, err := sqlmock.New()
1546+
require.NoError(t, err)
1547+
defer func() {
1548+
require.NoError(t, db.Close())
1549+
}()
1550+
1551+
conn, err := db.Conn(context.Background())
1552+
require.NoError(t, err)
1553+
1554+
meta := &mockTableIR{
1555+
dbName: database,
1556+
tblName: table,
1557+
colNames: []string{"string1", "int1", "int2", "float1", "bin1", "int3", "bool1", "int4"},
1558+
colTypes: []string{"VARCHAR", "INT", "BIGINT", "FLOAT", "BINARY", "MEDIUMINT", "BOOL", "TINYINT"},
1559+
specCmt: []string{
1560+
"/*!40101 SET NAMES binary*/;",
1561+
},
1562+
}
1563+
1564+
testCases := []struct {
1565+
expectedErr error
1566+
expectedField string
1567+
hasImplicitRowID bool
1568+
showIndexResults [][]driver.Value
1569+
}{
1570+
{
1571+
errors.New("show index error"),
1572+
"",
1573+
false,
1574+
nil,
1575+
}, {
1576+
nil,
1577+
"_tidb_rowid",
1578+
true,
1579+
nil,
1580+
}, // both primary and unique key columns are integers, use primary key first
1581+
{
1582+
nil,
1583+
"int1",
1584+
false,
1585+
[][]driver.Value{
1586+
{table, 0, "PRIMARY", 1, "int1", "A", 2, nil, nil, "", "BTREE", "", ""},
1587+
{table, 0, "PRIMARY", 2, "float1", "A", 2, nil, nil, "", "BTREE", "", ""},
1588+
{table, 0, "int2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1589+
{table, 1, "string1", 1, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1590+
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
1591+
},
1592+
}, // primary key doesn't have integer at seq 1, use unique key with integer
1593+
{
1594+
nil,
1595+
"int2",
1596+
false,
1597+
[][]driver.Value{
1598+
{table, 0, "PRIMARY", 1, "float1", "A", 2, nil, nil, "", "BTREE", "", ""},
1599+
{table, 0, "PRIMARY", 2, "int1", "A", 2, nil, nil, "", "BTREE", "", ""},
1600+
{table, 0, "int2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1601+
{table, 1, "string1", 1, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1602+
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
1603+
},
1604+
}, // several unique keys, use unique key who has a integer in seq 1
1605+
{
1606+
nil,
1607+
"int1",
1608+
false,
1609+
[][]driver.Value{
1610+
{table, 0, "u1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1611+
{table, 0, "u1", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1612+
{table, 0, "u1", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1613+
{table, 0, "u2", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1614+
{table, 0, "u2", 2, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1615+
},
1616+
}, // several unique keys and ordinary keys, use unique key who has a integer in seq 1
1617+
{
1618+
nil,
1619+
"int1",
1620+
false,
1621+
[][]driver.Value{
1622+
{table, 0, "u1", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1623+
{table, 0, "u1", 2, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1624+
{table, 0, "u2", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1625+
{table, 0, "u2", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1626+
{table, 0, "u2", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1627+
{table, 1, "int3", 1, "int3", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1628+
},
1629+
}, // several unique keys and ordinary keys, use unique key who has less columns
1630+
{
1631+
nil,
1632+
"int2",
1633+
false,
1634+
[][]driver.Value{
1635+
{table, 0, "u1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1636+
{table, 0, "u1", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1637+
{table, 0, "u1", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1638+
{table, 0, "u2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1639+
{table, 0, "u2", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1640+
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
1641+
},
1642+
}, // several unique keys and ordinary keys, use key who has max cardinality
1643+
{
1644+
nil,
1645+
"int2",
1646+
false,
1647+
[][]driver.Value{
1648+
{table, 0, "PRIMARY", 1, "string1", "A", 2, nil, nil, "", "BTREE", "", ""},
1649+
{table, 0, "u1", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1650+
{table, 0, "u1", 2, "int3", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1651+
{table, 1, "i1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1652+
{table, 1, "i2", 1, "int2", "A", 5, nil, nil, "YES", "BTREE", "", ""},
1653+
{table, 1, "i2", 2, "bool1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1654+
{table, 1, "i3", 1, "bin1", "A", 10, nil, nil, "YES", "BTREE", "", ""},
1655+
{table, 1, "i3", 2, "int4", "A", 10, nil, nil, "YES", "BTREE", "", ""},
1656+
},
1657+
},
1658+
}
1659+
1660+
query := fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)
1661+
for i, testCase := range testCases {
1662+
t.Logf("case #%d", i)
1663+
1664+
meta.hasImplicitRowID = testCase.hasImplicitRowID
1665+
expectedErr := testCase.expectedErr
1666+
if expectedErr != nil {
1667+
mock.ExpectQuery(query).WillReturnError(expectedErr)
1668+
} else if !testCase.hasImplicitRowID {
1669+
rows := sqlmock.NewRows(showIndexHeaders)
1670+
for _, showIndexResult := range testCase.showIndexResults {
1671+
rows.AddRow(showIndexResult...)
1672+
}
1673+
mock.ExpectQuery(query).WillReturnRows(rows)
1674+
}
1675+
1676+
var field string
1677+
field, err = pickupPossibleField(meta, conn)
1678+
if expectedErr != nil {
1679+
require.ErrorIs(t, errors.Cause(err), expectedErr)
1680+
} else {
1681+
require.NoError(t, err)
1682+
require.Equal(t, testCase.expectedField, field)
1683+
}
1684+
require.NoError(t, mock.ExpectationsWereMet())
1685+
}
1686+
}
1687+
15431688
func makeVersion(major, minor, patch int64, preRelease string) *semver.Version {
15441689
return &semver.Version{
15451690
Major: major,

v4/export/sql_type.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ func initColTypeRowReceiverMap() {
3131
"ENUM", "SET", "JSON", "NULL", "VAR_STRING",
3232
}
3333

34-
dataTypeNumArr := []string{
34+
dataTypeIntArr := []string{
3535
"INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT",
3636
"INT", "INT1", "INT2", "INT3", "INT8",
37+
}
38+
39+
dataTypeNumArr := append(dataTypeIntArr, []string{
3740
"FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION",
3841
"DECIMAL", "NUMERIC", "FIXED",
3942
"BOOL", "BOOLEAN",
40-
}
43+
}...)
4144

4245
dataTypeBinArr := []string{
4346
"BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG",
@@ -49,8 +52,10 @@ func initColTypeRowReceiverMap() {
4952
dataTypeString[s] = struct{}{}
5053
colTypeRowReceiverMap[s] = SQLTypeStringMaker
5154
}
55+
for _, s := range dataTypeIntArr {
56+
dataTypeInt[s] = struct{}{}
57+
}
5258
for _, s := range dataTypeNumArr {
53-
dataTypeNum[s] = struct{}{}
5459
colTypeRowReceiverMap[s] = SQLTypeNumberMaker
5560
}
5661
for _, s := range dataTypeBinArr {
@@ -59,7 +64,7 @@ func initColTypeRowReceiverMap() {
5964
}
6065
}
6166

62-
var dataTypeString, dataTypeNum, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{})
67+
var dataTypeString, dataTypeInt, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{})
6368

6469
func escapeBackslashSQL(s []byte, bf *bytes.Buffer) {
6570
var (

0 commit comments

Comments
 (0)