-
Notifications
You must be signed in to change notification settings - Fork 75
Expand file tree
/
Copy pathpagination_key.go
More file actions
287 lines (248 loc) · 7.46 KB
/
pagination_key.go
File metadata and controls
287 lines (248 loc) · 7.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package ghostferry
import (
"bytes"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"github.com/go-mysql-org/go-mysql/schema"
)
// PaginationKey represents a cursor position for paginating through table rows.
// It abstracts over different primary key types (integers, UUIDs, binary data)
// to enable consistent batched iteration through tables.
type PaginationKey interface {
// SQLValue returns the value to use in SQL WHERE clauses (e.g., WHERE id > ?).
SQLValue() interface{}
// ColumnName returns the column name this key belongs to, if known.
ColumnName() string
// Compare returns -1, 0, or 1 if this key is less than, equal to, or greater than other.
Compare(other PaginationKey) int
// NumericPosition returns a float64 approximation for progress tracking and estimation.
NumericPosition() float64
// String returns a human-readable representation for logging and debugging.
String() string
// MarshalJSON serializes the key for state persistence and checkpointing.
MarshalJSON() ([]byte, error)
// IsMax returns true if this key represents the maximum possible value for its type.
IsMax() bool
}
type Uint64Key struct {
Column string
Value uint64
}
type encodedKey struct {
Type string `json:"type"`
Value json.RawMessage `json:"value"`
Column string `json:"column,omitempty"`
}
func NewUint64Key(value uint64) Uint64Key {
return Uint64Key{Value: value}
}
func NewUint64KeyWithColumn(column string, value uint64) Uint64Key {
return Uint64Key{Column: column, Value: value}
}
func (k Uint64Key) SQLValue() interface{} {
return k.Value
}
func (k Uint64Key) ColumnName() string {
return k.Column
}
func (k Uint64Key) Compare(other PaginationKey) int {
otherKey, ok := other.(Uint64Key)
if !ok {
panic(fmt.Sprintf("cannot compare Uint64Key with %T", other))
}
if k.Value < otherKey.Value {
return -1
} else if k.Value > otherKey.Value {
return 1
}
return 0
}
func (k Uint64Key) NumericPosition() float64 {
return float64(k.Value)
}
func (k Uint64Key) String() string {
return fmt.Sprintf("%d", k.Value)
}
func (k Uint64Key) IsMax() bool {
return k.Value == math.MaxUint64
}
func (k Uint64Key) MarshalJSON() ([]byte, error) {
valBytes, err := json.Marshal(k.Value)
if err != nil {
return nil, err
}
return json.Marshal(encodedKey{
Type: "uint64",
Value: valBytes,
Column: k.Column,
})
}
type BinaryKey struct {
Column string
Value []byte
}
func NewBinaryKey(value []byte) BinaryKey {
clone := make([]byte, len(value))
copy(clone, value)
return BinaryKey{Value: clone}
}
func NewBinaryKeyWithColumn(column string, value []byte) BinaryKey {
clone := make([]byte, len(value))
copy(clone, value)
return BinaryKey{Column: column, Value: clone}
}
func (k BinaryKey) SQLValue() interface{} {
return k.Value
}
func (k BinaryKey) ColumnName() string {
return k.Column
}
func (k BinaryKey) Compare(other PaginationKey) int {
otherKey, ok := other.(BinaryKey)
if !ok {
panic(fmt.Sprintf("type mismatch: cannot compare BinaryKey with %T", other))
}
return bytes.Compare(k.Value, otherKey.Value)
}
// NumericPosition calculates a rough float position for progress tracking.
//
// Note: This method only uses the first 8 bytes of the binary key for progress calculation.
// This works well for timestamp-based keys like UUID v7 (where the first 48 bits are a timestamp),
// but progress may appear frozen when processing rows that differ only in bytes 9+.
// For random binary keys (like UUID v4), progress will be unpredictable.
//
// The core pagination algorithm (using Compare()) is unaffected and works correctly with any binary data.
func (k BinaryKey) NumericPosition() float64 {
if len(k.Value) == 0 {
return 0.0
}
// Take up to the first 8 bytes to form a uint64 for estimation
var buf [8]byte
copy(buf[:], k.Value)
val := binary.BigEndian.Uint64(buf[:])
return float64(val)
}
func (k BinaryKey) String() string {
return hex.EncodeToString(k.Value)
}
func (k BinaryKey) IsMax() bool {
// We cannot know the true "Max" of a VARBINARY without knowing the length.
// However, for UUID(16), we can check for FF...
if len(k.Value) == 0 {
return false
}
for _, b := range k.Value {
if b != 0xFF {
return false
}
}
return true
}
func (k BinaryKey) MarshalJSON() ([]byte, error) {
valBytes, err := json.Marshal(hex.EncodeToString(k.Value))
if err != nil {
return nil, err
}
return json.Marshal(encodedKey{
Type: "binary",
Value: valBytes,
Column: k.Column,
})
}
func UnmarshalPaginationKey(data []byte) (PaginationKey, error) {
var wrapper encodedKey
if err := json.Unmarshal(data, &wrapper); err != nil {
return nil, err
}
switch wrapper.Type {
case "uint64":
var i uint64
if err := json.Unmarshal(wrapper.Value, &i); err != nil {
return nil, err
}
key := NewUint64Key(i)
key.Column = wrapper.Column
return key, nil
case "binary":
var s string
if err := json.Unmarshal(wrapper.Value, &s); err != nil {
return nil, err
}
b, err := hex.DecodeString(s)
if err != nil {
return nil, err
}
key := NewBinaryKey(b)
key.Column = wrapper.Column
return key, nil
default:
return nil, fmt.Errorf("unknown key type: %s", wrapper.Type)
}
}
func MinPaginationKey(column *schema.TableColumn) PaginationKey {
switch column.Type {
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
return NewUint64KeyWithColumn(column.Name, 0)
// Handle all potential binary/string types
case schema.TYPE_BINARY, schema.TYPE_STRING:
// The smallest value for any binary/string type is an empty slice.
// Even for fixed BINARY(N), starting at empty ensures we catch [0x00, ...]
return NewBinaryKeyWithColumn(column.Name, []byte{})
default:
return NewUint64KeyWithColumn(column.Name, 0)
}
}
func MaxPaginationKey(column *schema.TableColumn) PaginationKey {
switch column.Type {
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
return NewUint64KeyWithColumn(column.Name, math.MaxUint64)
case schema.TYPE_BINARY, schema.TYPE_STRING:
// SAFETY: Cap the size to prevent OOM on LONGBLOB (4GB).
// InnoDB index limit is 3072 bytes. 4KB is a safe upper bound for a PK.
size := column.MaxSize
if size > 4096 {
size = 4096
}
maxBytes := make([]byte, size)
for i := range maxBytes {
maxBytes[i] = 0xFF
}
return NewBinaryKeyWithColumn(column.Name, maxBytes)
default:
return NewUint64KeyWithColumn(column.Name, math.MaxUint64)
}
}
// NewPaginationKeyFromRow extracts a pagination key from a row at the given index.
// It determines the appropriate key type based on the column schema.
func NewPaginationKeyFromRow(rowData RowData, index int, column *schema.TableColumn) (PaginationKey, error) {
switch column.Type {
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
value, err := rowData.GetUint64(index)
if err != nil {
return nil, fmt.Errorf("failed to get uint64 pagination key: %w", err)
}
return NewUint64KeyWithColumn(column.Name, value), nil
case schema.TYPE_BINARY, schema.TYPE_STRING:
valueInterface := rowData[index]
var valueBytes []byte
switch v := valueInterface.(type) {
case []byte:
valueBytes = v
case string:
valueBytes = []byte(v)
default:
return nil, fmt.Errorf("expected binary pagination key to be []byte or string, got %T", valueInterface)
}
return NewBinaryKeyWithColumn(column.Name, valueBytes), nil
default:
// Fallback for other integer types
value, err := rowData.GetUint64(index)
if err != nil {
return nil, fmt.Errorf("failed to get pagination key: %w", err)
}
return NewUint64KeyWithColumn(column.Name, value), nil
}
}