Skip to content
This repository was archived by the owner on Aug 21, 2023. It is now read-only.

Commit f8151ea

Browse files
authored
sql: fix pick up possible field (tidb#29399) (#399) (#400)
1 parent 095eb5c commit f8151ea

File tree

4 files changed

+211
-15
lines changed

4 files changed

+211
-15
lines changed

v4/export/dump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) {
756756
err = errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
757757
return
758758
}
759-
if _, ok := dataTypeNum[pkColTypes[0]]; !ok {
759+
if _, ok := dataTypeInt[pkColTypes[0]]; !ok {
760760
err = errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
761761
}
762762
return

v4/export/sql.go

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"database/sql"
99
"fmt"
1010
"io"
11+
"math"
1112
"net/url"
1213
"strconv"
1314
"strings"
@@ -441,6 +442,9 @@ func GetPrimaryKeyColumns(db *sql.Conn, database, table string) ([]string, error
441442
return cols, nil
442443
}
443444

445+
// getNumericIndex picks up indices according to the following priority:
446+
// primary key > unique key with the smallest count > key with the max cardinality
447+
// primary key with multi cols is before unique key with single col because we will sort result by primary keys
444448
func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) {
445449
database, table := meta.DatabaseName(), meta.TableName()
446450
colName2Type := string2Map(meta.ColumnNames(), meta.ColumnTypes())
@@ -449,22 +453,64 @@ func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) {
449453
if err != nil {
450454
return "", errors.Annotatef(err, "sql: %s", keyQuery)
451455
}
452-
results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "KEY_NAME", "COLUMN_NAME")
456+
results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "SEQ_IN_INDEX", "KEY_NAME", "COLUMN_NAME", "CARDINALITY")
453457
if err != nil {
454458
return "", errors.Annotatef(err, "sql: %s", keyQuery)
455459
}
456-
uniqueColumnName := ""
460+
type keyColumnPair struct {
461+
colName string
462+
count uint64
463+
}
464+
var (
465+
uniqueKeyMap = map[string]keyColumnPair{} // unique key name -> key column name, unique key columns count
466+
keyColumn string
467+
maxCardinality int64 = -1
468+
)
469+
457470
// check primary key first, then unique key
458471
for _, oneRow := range results {
459-
var ok bool
460-
if _, ok = dataTypeNum[colName2Type[oneRow[2]]]; ok && oneRow[1] == "PRIMARY" {
461-
return oneRow[2], nil
472+
nonUnique, seqInIndex, keyName, colName, cardinality := oneRow[0], oneRow[1], oneRow[2], oneRow[3], oneRow[4]
473+
// only try pick the first column, because the second column of pk/uk in where condition will trigger a full table scan
474+
if seqInIndex != "1" {
475+
if pair, ok := uniqueKeyMap[keyName]; ok {
476+
seqInIndexInt, err := strconv.ParseUint(seqInIndex, 10, 64)
477+
if err == nil && seqInIndexInt > pair.count {
478+
uniqueKeyMap[keyName] = keyColumnPair{pair.colName, seqInIndexInt}
479+
}
480+
}
481+
continue
462482
}
463-
if uniqueColumnName != "" && oneRow[0] == "0" && ok {
464-
uniqueColumnName = oneRow[2]
483+
_, numberColumn := dataTypeInt[colName2Type[colName]]
484+
if numberColumn {
485+
switch {
486+
case keyName == "PRIMARY":
487+
return colName, nil
488+
case nonUnique == "0":
489+
uniqueKeyMap[keyName] = keyColumnPair{colName, 1}
490+
// pick index column with max cardinality when there is no unique index
491+
case len(uniqueKeyMap) == 0:
492+
cardinalityInt, err := strconv.ParseInt(cardinality, 10, 64)
493+
if err == nil && cardinalityInt > maxCardinality {
494+
keyColumn = colName
495+
maxCardinality = cardinalityInt
496+
}
497+
}
498+
}
499+
}
500+
if len(uniqueKeyMap) > 0 {
501+
var (
502+
minCols uint64 = math.MaxUint64
503+
uniqueKeyColumn string
504+
)
505+
for _, pair := range uniqueKeyMap {
506+
if pair.count < minCols {
507+
uniqueKeyColumn = pair.colName
508+
minCols = pair.count
509+
}
465510
}
511+
return uniqueKeyColumn, nil
466512
}
467-
return uniqueColumnName, nil
513+
return keyColumn, nil
468514
}
469515

470516
// FlushTableWithReadLock flush tables with read lock
@@ -952,7 +998,7 @@ func pickupPossibleField(meta TableMeta, db *sql.Conn) (string, error) {
952998
if meta.HasImplicitRowID() {
953999
return "_tidb_rowid", nil
9541000
}
955-
// try to use pk
1001+
// try to use pk or uk
9561002
fieldName, err := getNumericIndex(db, meta)
9571003
if err != nil {
9581004
return "", err

v4/export/sql_test.go

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ import (
1616
"strings"
1717
"testing"
1818

19-
tcontext "github.com/pingcap/dumpling/v4/context"
2019
"github.com/stretchr/testify/require"
2120

21+
tcontext "github.com/pingcap/dumpling/v4/context"
22+
2223
"github.com/DATA-DOG/go-sqlmock"
2324
"github.com/coreos/go-semver/semver"
2425
"github.com/pingcap/errors"
@@ -1548,6 +1549,150 @@ func TestBuildVersion3RegionQueries(t *testing.T) {
15481549
}
15491550
}
15501551

1552+
func TestPickupPossibleField(t *testing.T) {
1553+
db, mock, err := sqlmock.New()
1554+
require.NoError(t, err)
1555+
defer func() {
1556+
require.NoError(t, db.Close())
1557+
}()
1558+
1559+
conn, err := db.Conn(context.Background())
1560+
require.NoError(t, err)
1561+
1562+
meta := &mockTableIR{
1563+
dbName: database,
1564+
tblName: table,
1565+
colNames: []string{"string1", "int1", "int2", "float1", "bin1", "int3", "bool1", "int4"},
1566+
colTypes: []string{"VARCHAR", "INT", "BIGINT", "FLOAT", "BINARY", "MEDIUMINT", "BOOL", "TINYINT"},
1567+
specCmt: []string{
1568+
"/*!40101 SET NAMES binary*/;",
1569+
},
1570+
}
1571+
1572+
testCases := []struct {
1573+
expectedErr error
1574+
expectedField string
1575+
hasImplicitRowID bool
1576+
showIndexResults [][]driver.Value
1577+
}{
1578+
{
1579+
errors.New("show index error"),
1580+
"",
1581+
false,
1582+
nil,
1583+
}, {
1584+
nil,
1585+
"_tidb_rowid",
1586+
true,
1587+
nil,
1588+
}, // both primary and unique key columns are integers, use primary key first
1589+
{
1590+
nil,
1591+
"int1",
1592+
false,
1593+
[][]driver.Value{
1594+
{table, 0, "PRIMARY", 1, "int1", "A", 2, nil, nil, "", "BTREE", "", ""},
1595+
{table, 0, "PRIMARY", 2, "float1", "A", 2, nil, nil, "", "BTREE", "", ""},
1596+
{table, 0, "int2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1597+
{table, 1, "string1", 1, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1598+
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
1599+
},
1600+
}, // primary key doesn't have integer at seq 1, use unique key with integer
1601+
{
1602+
nil,
1603+
"int2",
1604+
false,
1605+
[][]driver.Value{
1606+
{table, 0, "PRIMARY", 1, "float1", "A", 2, nil, nil, "", "BTREE", "", ""},
1607+
{table, 0, "PRIMARY", 2, "int1", "A", 2, nil, nil, "", "BTREE", "", ""},
1608+
{table, 0, "int2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1609+
{table, 1, "string1", 1, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1610+
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
1611+
},
1612+
}, // several unique keys, use unique key who has a integer in seq 1
1613+
{
1614+
nil,
1615+
"int1",
1616+
false,
1617+
[][]driver.Value{
1618+
{table, 0, "u1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1619+
{table, 0, "u1", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1620+
{table, 0, "u1", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1621+
{table, 0, "u2", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1622+
{table, 0, "u2", 2, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1623+
},
1624+
}, // several unique keys and ordinary keys, use unique key who has a integer in seq 1
1625+
{
1626+
nil,
1627+
"int1",
1628+
false,
1629+
[][]driver.Value{
1630+
{table, 0, "u1", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1631+
{table, 0, "u1", 2, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1632+
{table, 0, "u2", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1633+
{table, 0, "u2", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1634+
{table, 0, "u2", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1635+
{table, 1, "int3", 1, "int3", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1636+
},
1637+
}, // several unique keys and ordinary keys, use unique key who has less columns
1638+
{
1639+
nil,
1640+
"int2",
1641+
false,
1642+
[][]driver.Value{
1643+
{table, 0, "u1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1644+
{table, 0, "u1", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1645+
{table, 0, "u1", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1646+
{table, 0, "u2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1647+
{table, 0, "u2", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1648+
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
1649+
},
1650+
}, // several unique keys and ordinary keys, use key who has max cardinality
1651+
{
1652+
nil,
1653+
"int2",
1654+
false,
1655+
[][]driver.Value{
1656+
{table, 0, "PRIMARY", 1, "string1", "A", 2, nil, nil, "", "BTREE", "", ""},
1657+
{table, 0, "u1", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1658+
{table, 0, "u1", 2, "int3", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1659+
{table, 1, "i1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1660+
{table, 1, "i2", 1, "int2", "A", 5, nil, nil, "YES", "BTREE", "", ""},
1661+
{table, 1, "i2", 2, "bool1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
1662+
{table, 1, "i3", 1, "bin1", "A", 10, nil, nil, "YES", "BTREE", "", ""},
1663+
{table, 1, "i3", 2, "int4", "A", 10, nil, nil, "YES", "BTREE", "", ""},
1664+
},
1665+
},
1666+
}
1667+
1668+
query := fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)
1669+
for i, testCase := range testCases {
1670+
t.Logf("case #%d", i)
1671+
1672+
meta.hasImplicitRowID = testCase.hasImplicitRowID
1673+
expectedErr := testCase.expectedErr
1674+
if expectedErr != nil {
1675+
mock.ExpectQuery(query).WillReturnError(expectedErr)
1676+
} else if !testCase.hasImplicitRowID {
1677+
rows := sqlmock.NewRows(showIndexHeaders)
1678+
for _, showIndexResult := range testCase.showIndexResults {
1679+
rows.AddRow(showIndexResult...)
1680+
}
1681+
mock.ExpectQuery(query).WillReturnRows(rows)
1682+
}
1683+
1684+
var field string
1685+
field, err = pickupPossibleField(meta, conn)
1686+
if expectedErr != nil {
1687+
require.ErrorIs(t, errors.Cause(err), expectedErr)
1688+
} else {
1689+
require.NoError(t, err)
1690+
require.Equal(t, testCase.expectedField, field)
1691+
}
1692+
require.NoError(t, mock.ExpectationsWereMet())
1693+
}
1694+
}
1695+
15511696
func makeVersion(major, minor, patch int64, preRelease string) *semver.Version {
15521697
return &semver.Version{
15531698
Major: major,

v4/export/sql_type.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ func initColTypeRowReceiverMap() {
3131
"ENUM", "SET", "JSON", "NULL", "VAR_STRING",
3232
}
3333

34-
dataTypeNumArr := []string{
34+
dataTypeIntArr := []string{
3535
"INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT",
3636
"INT", "INT1", "INT2", "INT3", "INT8",
37+
}
38+
39+
dataTypeNumArr := append(dataTypeIntArr, []string{
3740
"FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION",
3841
"DECIMAL", "NUMERIC", "FIXED",
3942
"BOOL", "BOOLEAN",
40-
}
43+
}...)
4144

4245
dataTypeBinArr := []string{
4346
"BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG",
@@ -49,8 +52,10 @@ func initColTypeRowReceiverMap() {
4952
dataTypeString[s] = struct{}{}
5053
colTypeRowReceiverMap[s] = SQLTypeStringMaker
5154
}
55+
for _, s := range dataTypeIntArr {
56+
dataTypeInt[s] = struct{}{}
57+
}
5258
for _, s := range dataTypeNumArr {
53-
dataTypeNum[s] = struct{}{}
5459
colTypeRowReceiverMap[s] = SQLTypeNumberMaker
5560
}
5661
for _, s := range dataTypeBinArr {
@@ -59,7 +64,7 @@ func initColTypeRowReceiverMap() {
5964
}
6065
}
6166

62-
var dataTypeString, dataTypeNum, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{})
67+
var dataTypeString, dataTypeInt, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{})
6368

6469
func escapeBackslashSQL(s []byte, bf *bytes.Buffer) {
6570
var (

0 commit comments

Comments
 (0)