forked from transferia/transferia
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathstorage.go
More file actions
417 lines (364 loc) · 12 KB
/
storage.go
File metadata and controls
417 lines (364 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
package abstract
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/transferia/transferia/library/go/core/xerrors"
"github.com/transferia/transferia/pkg/errors/coded"
"github.com/transferia/transferia/pkg/errors/codes"
"github.com/transferia/transferia/pkg/util"
"github.com/transferia/transferia/pkg/util/set"
)
type LoadProgress func(current, progress, total uint64)
type TableDescription struct {
Schema string // for example - for mysql here are database name
Name string
Filter WhereStatement
EtaRow uint64 // estimated number of rows in the table
Offset uint64 // offset (in rows) along the ordering key (not necessary primary key)
}
const IsAsyncPartsUploadedStateKey = "is-async-parts-uploaded"
func BuildIncludeMap(objects []string) (map[TableID]bool, error) {
includeObjects := map[TableID]bool{}
var errs util.Errors
for _, obj := range objects {
tid, err := ParseTableID(obj)
if err != nil {
errs = append(errs, xerrors.Errorf("object: %s: %w", obj, err))
continue
}
includeObjects[*tid] = true
}
if len(errs) > 0 {
return nil, xerrors.Errorf("unable to parse objects: %w", errs)
}
return includeObjects, nil
}
func SchemaFilterByObjects(schema DBSchema, objects []string) (DBSchema, error) {
if objects == nil {
return schema, nil
}
res := make(DBSchema)
for _, obj := range objects {
id, err := ParseTableID(obj)
if err != nil {
return nil, xerrors.Errorf("unable to parse: %s: %w", obj, err)
}
info, ok := schema[*id]
if !ok {
return nil, xerrors.Errorf("object %s not found in schema", obj)
}
res[*id] = info
}
return res, nil
}
func ParseTableID(object string) (*TableID, error) {
return NewTableIDFromStringPg(object, false)
}
func ParseTableIDs(objects ...string) ([]TableID, error) {
var res []TableID
for _, obj := range objects {
tid, err := ParseTableID(obj)
if err != nil {
return nil, xerrors.Errorf("unable to parse table ID: %s: %w", obj, err)
}
res = append(res, *tid)
}
return res, nil
}
// NewTableIDFromStringPg parses the given FQTN in PostgreSQL syntax to construct a TableID.
func NewTableIDFromStringPg(fqtn string, replaceOmittedSchemaWithPublic bool) (*TableID, error) {
parts, err := identifierToParts(fqtn)
if err != nil {
return nil, coded.Errorf(codes.InvalidObjectIdentifier, "failed to identify parts: %s: %w", fqtn, err)
}
switch len(parts) {
case 0:
return nil, coded.Errorf(codes.InvalidObjectIdentifier, "object identifier has no parts: %s", fqtn)
case 1:
if replaceOmittedSchemaWithPublic {
return &TableID{Namespace: "public", Name: parts[0]}, nil
}
return &TableID{Namespace: "", Name: parts[0]}, nil
case 2:
return &TableID{Namespace: parts[0], Name: parts[1]}, nil
default:
return nil, coded.Errorf(codes.InvalidObjectIdentifier, "identifier '%s' contains %d parts instead of maximum two", fqtn, len(parts))
}
}
// identifierToParts separates the given identifier into parts separated by dot.
// Each part may be double-quoted and may contain double-escaped double quotes. These are properly parsed; external double quotes are removed.
func identifierToParts(identifier string) ([]string, error) {
result := make([]string, 0, 2)
idPartBuilder := strings.Builder{}
identifierInProgress := false
insideDoubleQuotedIdentifier := false
previousWasDoubleQuote := false
overFQTNRunes:
for i, r := range identifier {
switch r {
case '"':
if !insideDoubleQuotedIdentifier {
if !identifierInProgress {
insideDoubleQuotedIdentifier = true
continue overFQTNRunes
} else {
return nil, xerrors.Errorf("\" outside of a double-quoted name part in position [%d]", i-1)
}
}
if !previousWasDoubleQuote {
previousWasDoubleQuote = true
continue overFQTNRunes
}
previousWasDoubleQuote = false
case '.':
if insideDoubleQuotedIdentifier {
if !previousWasDoubleQuote {
// just a dot inside an escaped identifier
break
}
// previous character (double quote) was an end of an escaped identifier
previousWasDoubleQuote = false
insideDoubleQuotedIdentifier = false
}
if !identifierInProgress {
return nil, xerrors.Errorf("zero-length name part ending in position [%d]", i)
}
result = append(result, idPartBuilder.String())
idPartBuilder.Reset()
identifierInProgress = false
continue overFQTNRunes
default:
if previousWasDoubleQuote {
return nil, xerrors.Errorf("unescaped \" in position [%d]", i-1)
}
}
identifierInProgress = true
_, _ = idPartBuilder.WriteRune(r)
}
if !identifierInProgress {
return nil, xerrors.Errorf("zero-length name part ending in position [%d]", len(identifier)-1)
}
if insideDoubleQuotedIdentifier {
if !previousWasDoubleQuote {
return nil, xerrors.Errorf("missing ending \" in position [%d]", len(identifier)-1)
}
previousWasDoubleQuote = false
insideDoubleQuotedIdentifier = false
}
result = append(result, idPartBuilder.String())
idPartBuilder.Reset()
identifierInProgress = false
return result, nil
}
type TableInfo struct {
EtaRow uint64
IsView bool
Schema *TableSchema
}
type TableMap map[TableID]TableInfo
func (m *TableMap) Copy() TableMap {
cp := TableMap{}
for id, info := range *m {
cp[id] = info
}
return cp
}
func (m *TableMap) String(withSchema bool) string {
mapCopy := make(map[string]TableInfo)
for k, v := range *m {
var newTableInfo TableInfo
newTableInfo.EtaRow = v.EtaRow
newTableInfo.IsView = v.IsView
if withSchema {
newTableInfo.Schema = v.Schema
}
mapCopy[k.Fqtn()] = newTableInfo
}
result, _ := json.Marshal(mapCopy)
return string(result)
}
func (m *TableMap) NoKeysTables() []TableID {
var noKeyTables []TableID
for tID, tInfo := range *m {
if !tInfo.Schema.Columns().HasPrimaryKey() && !tInfo.Schema.Columns().HasFakeKeys() && !tInfo.IsView {
noKeyTables = append(noKeyTables, tID)
}
}
return noKeyTables
}
func (m *TableMap) FakePkeyTables() []TableID {
var fakePkeyTables []TableID
for tID, tInfo := range *m {
if tInfo.Schema.Columns().HasFakeKeys() {
fakePkeyTables = append(fakePkeyTables, tID)
}
}
return fakePkeyTables
}
func (m *TableMap) ConvertToTableDescriptions() []TableDescription {
tableDescriptions := make([]TableDescription, 0, len(*m))
for tID, tInfo := range *m {
tableDescriptions = append(tableDescriptions, TableDescription{
Name: tID.Name,
Schema: tID.Namespace,
EtaRow: tInfo.EtaRow,
Filter: "",
Offset: 0,
})
}
return tableDescriptions
}
func (m *TableMap) ToDBSchema() DBSchema {
resultSchema := make(DBSchema)
for tableID, tableInfo := range *m {
resultSchema[tableID] = tableInfo.Schema
}
return resultSchema
}
func (t *TableDescription) ID() TableID {
return TableID{Namespace: t.Schema, Name: t.Name}
}
func (t *TableDescription) PartID() string {
if t.Offset == 0 && t.Filter == "" {
// This needed for s3, see: https://github.com/transferia/transferia/review/3538625
return ""
}
asJSONString := fmt.Sprintf(`{"schema": "%s","name": "%s","filter": "%s","offset": %d}`,
t.Schema, t.Name, t.Filter, t.Offset)
return util.Hash(asJSONString)
}
func (t *TableDescription) Same(table string) bool {
if t.Name == table {
return true
}
if fmt.Sprintf("%v.%v", t.Schema, t.Name) == table {
return true
}
if fmt.Sprintf("%v.\"%v\"", t.Schema, t.Name) == table {
return true
}
if t.Fqtn() == table {
return true
}
return false
}
func (t *TableDescription) Fqtn() string {
return t.ID().Fqtn()
}
func (t *TableDescription) String() string {
return fmt.Sprintf("%s [filter %q offset %d]", t.Fqtn(), t.Filter, t.Offset)
}
// TableIDsIntersection returns an intersection of two lists of TableIDs
func TableIDsIntersection(a []TableID, b []TableID) []TableID {
if len(b) == 0 {
return a
}
if len(a) == 0 {
return b
}
// both sets are not empty, find an intersection
resultSet := set.New[TableID]()
for iA := range a {
for iB := range b {
if a[iA].Includes(b[iB]) {
resultSet.Add(b[iB])
} else if b[iB].Includes(a[iA]) {
resultSet.Add(a[iA])
}
}
}
return resultSet.SortedSliceFunc(func(a, b TableID) bool {
return a.Less(b) < 0
})
}
// Storage is for simple storage implementations
// For extra functionalities implement below storages.
type Storage interface {
Closeable
Ping() error
LoadTable(ctx context.Context, table TableDescription, pusher Pusher) error
TableSchema(ctx context.Context, table TableID) (*TableSchema, error)
TableList(filter IncludeTableList) (TableMap, error)
ExactTableRowsCount(table TableID) (uint64, error)
EstimateTableRowsCount(table TableID) (uint64, error)
TableExists(table TableID) (bool, error)
}
// PositionalStorage some storages may provide specific position for snapshot consistency
type PositionalStorage interface {
// Position provide info about snapshot read position
Position(ctx context.Context) (*LogPosition, error)
}
type LogPosition struct {
ID uint32
LSN uint64
TxID string
}
// SchemaStorage allow to resolve DB Schema from storage
type SchemaStorage interface {
LoadSchema() (DBSchema, error)
}
// SampleableStorage is for dataplane tests
type SampleableStorage interface {
Storage
TableSizeInBytes(table TableID) (uint64, error)
LoadTopBottomSample(table TableDescription, pusher Pusher) error
LoadRandomSample(table TableDescription, pusher Pusher) error
LoadSampleBySet(table TableDescription, keySet []map[string]interface{}, pusher Pusher) error
TableAccessible(table TableDescription) bool
}
// ShardingStorage is for in table sharding
type ShardingStorage interface {
ShardTable(ctx context.Context, table TableDescription) ([]TableDescription, error)
}
// Storage has data, that need to be shared with all workers
type ShardingContextStorage interface {
// ShardingContext Return shared data, used on *MAIN* worker;
// Take care, method return OperationState_ShardedUploadState, but only fill field Context;
// Because type of Context is private, this is protoc thing;
ShardingContext() ([]byte, error)
// SetShardingContext for storage, used on *SECONDARY* worker
SetShardingContext(shardedState []byte) error
}
type IncrementalStorage interface {
GetNextIncrementalState(ctx context.Context, incremental []IncrementalTable) ([]IncrementalState, error)
BuildArrTableDescriptionWithIncrementalState(tables []TableDescription, incremental []IncrementalTable) []TableDescription
}
type SnapshotableStorage interface {
BeginSnapshot(ctx context.Context) error
EndSnapshot(ctx context.Context) error
}
//---------------------------------------------------------------------------------------------------------------------
// async table part provider
type SharedMemory interface {
// main methods - add, get, update:
Store(in []*OperationTablePart) error
NextOperationTablePart(ctx context.Context) (*OperationTablePart, error)
UpdateOperationTablesParts(operationID string, tables []*OperationTablePart) error
// additional work with OperationState:
GetShardStateNoWait(ctx context.Context, operationID string) (string, error)
SetOperationState(operationID string, newState string) error
}
// NextArrTableDescriptionGetter is used in async_table_parts (tpp_*_async.go) to get tasks
type NextArrTableDescriptionGetter interface {
NextArrTableDescription(ctx context.Context, limit uint64) ([]TableDescription, error)
Close()
}
// NextArrTableDescriptionGetterBuilder means there are used async_table_parts mechanism
//
// NOTE: For such storage in sharding context (operation state) could appear value with
// key IsAsyncPartsUploadedStateKey, which is used by control code and should be not changed by storage.
type NextArrTableDescriptionGetterBuilder interface {
ShardingContextStorage
BuildNextArrTableDescriptionGetter(operationID string, tables []TableDescription) (NextArrTableDescriptionGetter, error)
}
// This is special workaround for partitioned_tables in postgres
// see FulfilledIncludes
//
// load_snapshot expects TableList should return all tables,
// but for partitioned_tables with CollapseInheritTables we skip partitioned_table in TableList
// then in SnapshotLoader.CheckIncludeDirectives made handling of this case via 'SkippableStorage' interface
type SkippableStorage interface {
Skipped(tID TableID) (bool, error)
}