Skip to content

Commit c49b3e5

Browse files
authored
Merge pull request #417 from Shopify/uuid-as-id
Pagination beyond uint64
2 parents 3fbf326 + c0e81fb commit c49b3e5

32 files changed

+1832
-291
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ All notable changes to this project will be documented in this file.
77
### Added
88

99
- Changelog.
10+
- Pagination keys beyond UINT64 @milanatshopify #417
11+
- Pagination keys other than UINT64 have to have binary collation @grodowski #422
1012

1113
## [1.1.0]
1214

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Variables to be built into the binary
2-
VERSION := 1.1.0
2+
VERSION := 1.2.0
33

44
# This variable can be overwritten by the caller
55
DATETIME ?= $(shell date -u +%Y%m%d%H%M%S)

batch_writer.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,13 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
5656
return nil
5757
}
5858

59-
startPaginationKeypos, err := values[0].GetUint64(batch.PaginationKeyIndex())
59+
paginationColumn := batch.TableSchema().GetPaginationColumn()
60+
61+
startPaginationKeypos, err := NewPaginationKeyFromRow(values[0], batch.PaginationKeyIndex(), paginationColumn)
6062
if err != nil {
6163
return err
6264
}
63-
64-
endPaginationKeypos, err := values[len(values)-1].GetUint64(batch.PaginationKeyIndex())
65+
endPaginationKeypos, err := NewPaginationKeyFromRow(values[len(values)-1], batch.PaginationKeyIndex(), paginationColumn)
6566
if err != nil {
6667
return err
6768
}
@@ -78,12 +79,12 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
7879

7980
query, args, err := batch.AsSQLQuery(db, table)
8081
if err != nil {
81-
return fmt.Errorf("during generating sql query at paginationKey %v -> %v: %v", startPaginationKeypos, endPaginationKeypos, err)
82+
return fmt.Errorf("during generating sql query at paginationKey %s -> %s: %v", startPaginationKeypos.String(), endPaginationKeypos.String(), err)
8283
}
8384

8485
stmt, err := w.stmtCache.StmtFor(w.DB, query)
8586
if err != nil {
86-
return fmt.Errorf("during prepare query near paginationKey %v -> %v (%s): %v", startPaginationKeypos, endPaginationKeypos, query, err)
87+
return fmt.Errorf("during prepare query near paginationKey %s -> %s (%s): %v", startPaginationKeypos.String(), endPaginationKeypos.String(), query, err)
8788
}
8889

8990
tx, err := w.DB.Begin()
@@ -94,14 +95,14 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
9495
_, err = tx.Stmt(stmt).Exec(args...)
9596
if err != nil {
9697
tx.Rollback()
97-
return fmt.Errorf("during exec query near paginationKey %v -> %v (%s): %v", startPaginationKeypos, endPaginationKeypos, query, err)
98+
return fmt.Errorf("during exec query near paginationKey %s -> %s (%s): %v", startPaginationKeypos.String(), endPaginationKeypos.String(), query, err)
9899
}
99100

100101
if w.InlineVerifier != nil {
101102
mismatches, err := w.InlineVerifier.CheckFingerprintInline(tx, db, table, batch, w.EnforceInlineVerification)
102103
if err != nil {
103104
tx.Rollback()
104-
return fmt.Errorf("during fingerprint checking for paginationKey %v -> %v (%s): %v", startPaginationKeypos, endPaginationKeypos, query, err)
105+
return fmt.Errorf("during fingerprint checking for paginationKey %s -> %s (%s): %v", startPaginationKeypos.String(), endPaginationKeypos.String(), query, err)
105106
}
106107

107108
if w.EnforceInlineVerification {
@@ -119,7 +120,7 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
119120
err = tx.Commit()
120121
if err != nil {
121122
tx.Rollback()
122-
return fmt.Errorf("during commit near paginationKey %v -> %v (%s): %v", startPaginationKeypos, endPaginationKeypos, query, err)
123+
return fmt.Errorf("during commit near paginationKey %s -> %s (%s): %v", startPaginationKeypos.String(), endPaginationKeypos.String(), query, err)
123124
}
124125

125126
// Note that the state tracker expects us the track based on the original

compression_verifier.go

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (e UnsupportedCompressionError) Error() string {
4949
type CompressionVerifier struct {
5050
logger *logrus.Entry
5151

52+
TableSchemaCache TableSchemaCache
5253
supportedAlgorithms map[string]struct{}
5354
tableColumnCompressions TableColumnCompressionConfig
5455
}
@@ -59,32 +60,52 @@ type CompressionVerifier struct {
5960
// The GetCompressedHashes method checks if the existing table contains compressed data
6061
// and will apply the decompression algorithm to the applicable columns if necessary.
6162
// After the columns are decompressed, the hashes of the data are used to verify equality
62-
func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (map[uint64][]byte, error) {
63+
func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schemaName, tableName, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []interface{}) (map[string][]byte, error) {
6364
c.logger.WithFields(logrus.Fields{
6465
"tag": "compression_verifier",
65-
"table": table,
66+
"table": tableName,
6667
}).Info("decompressing table data before verification")
6768

68-
tableCompression := c.tableColumnCompressions[table]
69+
tableCompression := c.tableColumnCompressions[tableName]
6970

7071
// Extract the raw rows using SQL to be decompressed
71-
rows, err := getRows(db, schema, table, paginationKeyColumn, columns, paginationKeys)
72+
rows, err := getRows(db, schemaName, tableName, paginationKeyColumn, columns, paginationKeys)
7273
if err != nil {
7374
return nil, err
7475
}
7576
defer rows.Close()
7677

77-
// Decompress applicable columns and hash the resulting column values for comparison
78-
resultSet := make(map[uint64][]byte)
78+
table := c.TableSchemaCache.Get(schemaName, tableName)
79+
if table == nil {
80+
return nil, fmt.Errorf("table %s.%s not found in schema cache", schemaName, tableName)
81+
}
82+
paginationColumn := table.GetPaginationColumn()
83+
resultSet := make(map[string][]byte)
84+
7985
for rows.Next() {
8086
rowData, err := ScanByteRow(rows, len(columns)+1)
8187
if err != nil {
8288
return nil, err
8389
}
8490

85-
paginationKey, err := strconv.ParseUint(string(rowData[0]), 10, 64)
86-
if err != nil {
87-
return nil, err
91+
var paginationKeyStr string
92+
switch paginationColumn.Type {
93+
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
94+
paginationKeyUint, err := strconv.ParseUint(string(rowData[0]), 10, 64)
95+
if err != nil {
96+
return nil, err
97+
}
98+
paginationKeyStr = NewUint64Key(paginationKeyUint).String()
99+
100+
case schema.TYPE_BINARY, schema.TYPE_STRING:
101+
paginationKeyStr = NewBinaryKey(rowData[0]).String()
102+
103+
default:
104+
paginationKeyUint, err := strconv.ParseUint(string(rowData[0]), 10, 64)
105+
if err != nil {
106+
return nil, err
107+
}
108+
paginationKeyStr = NewUint64Key(paginationKeyUint).String()
88109
}
89110

90111
// Decompress the applicable columns and then hash them together
@@ -95,7 +116,7 @@ func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, pag
95116
for idx, column := range columns {
96117
if algorithm, ok := tableCompression[column.Name]; ok {
97118
// rowData contains the result of "SELECT paginationKeyColumn, * FROM ...", so idx+1 to get each column
98-
decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx+1])
119+
decompressedColData, err := c.Decompress(tableName, column.Name, algorithm, rowData[idx+1])
99120
if err != nil {
100121
return nil, err
101122
}
@@ -111,20 +132,20 @@ func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, pag
111132
return nil, err
112133
}
113134

114-
resultSet[paginationKey] = decompressedRowHash
135+
resultSet[paginationKeyStr] = decompressedRowHash
115136
}
116137

117138
metrics.Gauge(
118139
"compression_verifier_decompress_rows",
119140
float64(len(resultSet)),
120-
[]MetricTag{{"table", table}},
141+
[]MetricTag{{"table", tableName}},
121142
1.0,
122143
)
123144

124145
logrus.WithFields(logrus.Fields{
125146
"tag": "compression_verifier",
126147
"rows": len(resultSet),
127-
"table": table,
148+
"table": tableName,
128149
}).Debug("decompressed rows will be compared")
129150

130151
return resultSet, nil
@@ -192,12 +213,13 @@ func (c *CompressionVerifier) verifyConfiguredCompression(tableColumnCompression
192213

193214
// NewCompressionVerifier first checks the map for supported compression algorithms before
194215
// initializing and returning the initialized instance.
195-
func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error) {
216+
func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig, tableSchemaCache TableSchemaCache) (*CompressionVerifier, error) {
196217
supportedAlgorithms := make(map[string]struct{})
197218
supportedAlgorithms[CompressionSnappy] = struct{}{}
198219

199220
compressionVerifier := &CompressionVerifier{
200221
logger: logrus.WithField("tag", "compression_verifier"),
222+
TableSchemaCache: tableSchemaCache,
201223
supportedAlgorithms: supportedAlgorithms,
202224
tableColumnCompressions: tableColumnCompressions,
203225
}
@@ -209,7 +231,7 @@ func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig
209231
return compressionVerifier, nil
210232
}
211233

212-
func getRows(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (*sqlorig.Rows, error) {
234+
func getRows(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []interface{}) (*sqlorig.Rows, error) {
213235
quotedPaginationKey := QuoteField(paginationKeyColumn)
214236
sql, args, err := rowSelector(columns, paginationKeyColumn).
215237
From(QuotedTableNameFromString(schema, table)).

config.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,12 +376,17 @@ func (c ForceIndexConfig) IndexFor(schemaName, tableName string) string {
376376
// CascadingPaginationColumnConfig to configure pagination columns to be
377377
// used. The term `Cascading` to denote that greater specificity takes
378378
// precedence.
379+
//
380+
// IMPORTANT: All configured pagination columns must contain unique values.
381+
// When specifying a FallbackColumn for tables with composite primary keys,
382+
// ensure the column has a unique constraint to prevent data loss during migration.
379383
type CascadingPaginationColumnConfig struct {
380384
// PerTable has greatest specificity and takes precedence over the other options
381385
PerTable map[string]map[string]string // SchemaName => TableName => ColumnName
382386

383387
// FallbackColumn is a global default to fallback to and is less specific than the
384-
// default, which is the Primary Key
388+
// default, which is the Primary Key.
389+
// This column MUST have unique values (ideally a unique constraint) for data integrity.
385390
FallbackColumn string
386391
}
387392

@@ -727,10 +732,15 @@ type Config struct {
727732
//
728733
ForceIndexForVerification ForceIndexConfig
729734

730-
// Ghostferry requires a single numeric column to paginate over tables. Inferring that column is done in the following exact order:
735+
// Ghostferry requires a single numeric or binary column to paginate over tables. Inferring that column is done in the following exact order:
731736
// 1. Use the PerTable pagination column, if configured for a table. Fail if we cannot find this column in the table.
732-
// 2. Use the table's primary key column as the pagination column. Fail if the primary key is not numeric or is a composite key without a FallbackColumn specified.
737+
// 2. Use the table's primary key column as the pagination column. Fail if the primary key is not numeric/binary or is a composite key without a FallbackColumn specified.
733738
// 3. Use the FallbackColumn pagination column, if configured. Fail if we cannot find this column in the table.
739+
//
740+
// IMPORTANT: The pagination column MUST contain unique values for data integrity.
741+
// When using a FallbackColumn (typically "id") for tables with composite primary keys, this column must have a unique constraint.
742+
// The pagination algorithm uses WHERE pagination_key > last_key ORDER BY pagination_key LIMIT batch_size.
743+
// If duplicate values exist, rows may be skipped during iteration, resulting in data loss during the migration.
734744
CascadingPaginationColumnConfig *CascadingPaginationColumnConfig
735745

736746
// SkipTargetVerification is used to enable or disable target verification during moves.

cursor.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type CursorConfig struct {
3838
Throttler Throttler
3939

4040
ColumnsToSelect []string
41-
BuildSelect func([]string, *TableSchema, uint64, uint64) (squirrel.SelectBuilder, error)
41+
BuildSelect func([]string, *TableSchema, PaginationKey, uint64) (squirrel.SelectBuilder, error)
4242
// BatchSize is a pointer to the BatchSize in Config.UpdatableConfig which can be independently updated from this code.
4343
// Having it as a pointer allows the updated value to be read without needing additional code to copy the batch size value into the cursor config for each cursor we create.
4444
BatchSize *uint64
@@ -47,7 +47,7 @@ type CursorConfig struct {
4747
}
4848

4949
// returns a new Cursor with an embedded copy of itself
50-
func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor {
50+
func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPaginationKey PaginationKey) *Cursor {
5151
return &Cursor{
5252
CursorConfig: *c,
5353
Table: table,
@@ -58,7 +58,7 @@ func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPagi
5858
}
5959

6060
// returns a new Cursor with an embedded copy of itself
61-
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor {
61+
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey PaginationKey) *Cursor {
6262
cursor := c.NewCursor(table, startPaginationKey, maxPaginationKey)
6363
cursor.RowLock = false
6464
return cursor
@@ -77,11 +77,11 @@ type Cursor struct {
7777
CursorConfig
7878

7979
Table *TableSchema
80-
MaxPaginationKey uint64
80+
MaxPaginationKey PaginationKey
8181
RowLock bool
8282

8383
paginationKeyColumn *schema.TableColumn
84-
lastSuccessfulPaginationKey uint64
84+
lastSuccessfulPaginationKey PaginationKey
8585
logger *logrus.Entry
8686
}
8787

@@ -96,10 +96,10 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
9696
c.ColumnsToSelect = []string{"*"}
9797
}
9898

99-
for c.lastSuccessfulPaginationKey < c.MaxPaginationKey {
99+
for c.lastSuccessfulPaginationKey.Compare(c.MaxPaginationKey) < 0 {
100100
var tx SqlPreparerAndRollbacker
101101
var batch *RowBatch
102-
var paginationKeypos uint64
102+
var paginationKeypos PaginationKey
103103

104104
err := WithRetries(c.ReadRetries, 1*time.Second, c.logger, "fetch rows", func() (err error) {
105105
if c.Throttler != nil {
@@ -137,9 +137,9 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
137137
break
138138
}
139139

140-
if paginationKeypos <= c.lastSuccessfulPaginationKey {
140+
if paginationKeypos.Compare(c.lastSuccessfulPaginationKey) <= 0 {
141141
tx.Rollback()
142-
err = fmt.Errorf("new paginationKeypos %d <= lastSuccessfulPaginationKey %d", paginationKeypos, c.lastSuccessfulPaginationKey)
142+
err = fmt.Errorf("new paginationKeypos %s <= lastSuccessfulPaginationKey %s", paginationKeypos.String(), c.lastSuccessfulPaginationKey.String())
143143
c.logger.WithError(err).Errorf("last successful paginationKey position did not advance")
144144
return err
145145
}
@@ -159,7 +159,7 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
159159
return nil
160160
}
161161

162-
func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos uint64, err error) {
162+
func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos PaginationKey, err error) {
163163
var selectBuilder squirrel.SelectBuilder
164164
batchSize := c.CursorConfig.GetBatchSize(c.Table.Schema, c.Table.Name)
165165

@@ -176,7 +176,7 @@ func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos uint64
176176
if c.RowLock {
177177
mySqlVersion, err := c.DB.QueryMySQLVersion()
178178
if err != nil {
179-
return nil, 0, err
179+
return nil, NewUint64Key(0), err
180180
}
181181
if strings.HasPrefix(mySqlVersion, "8.") {
182182
selectBuilder = selectBuilder.Suffix("FOR SHARE NOWAIT")
@@ -261,9 +261,10 @@ func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos uint64
261261
}
262262

263263
if len(batchData) > 0 {
264-
paginationKeypos, err = batchData[len(batchData)-1].GetUint64(paginationKeyIndex)
264+
lastRowData := batchData[len(batchData)-1]
265+
paginationKeypos, err = NewPaginationKeyFromRow(lastRowData, paginationKeyIndex, c.paginationKeyColumn)
265266
if err != nil {
266-
logger.WithError(err).Error("failed to get uint64 paginationKey value")
267+
logger.WithError(err).Error("failed to get paginationKey value")
267268
return
268269
}
269270
}
@@ -304,12 +305,12 @@ func ScanByteRow(rows *sqlorig.Rows, columnCount int) ([][]byte, error) {
304305
return values, err
305306
}
306307

307-
func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey, batchSize uint64) squirrel.SelectBuilder {
308+
func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey PaginationKey, batchSize uint64) squirrel.SelectBuilder {
308309
quotedPaginationKey := QuoteField(table.GetPaginationColumn().Name)
309310

310311
return squirrel.Select(columns...).
311312
From(QuotedTableName(table)).
312-
Where(squirrel.Gt{quotedPaginationKey: lastPaginationKey}).
313+
Where(squirrel.Gt{quotedPaginationKey: lastPaginationKey.SQLValue()}).
313314
Limit(batchSize).
314315
OrderBy(quotedPaginationKey)
315316
}

0 commit comments

Comments
 (0)