Skip to content

Commit d70eaff

Browse files
authored
feat: Expose special migration paths (#1470)
This pr allows destinations to handle some of the more safe but unique migrations. This will assist with the changes that we have plan with regards to _cq_id and uniqueness changes
1 parent acb1ac7 commit d70eaff

File tree

2 files changed

+207
-0
lines changed

2 files changed

+207
-0
lines changed

schema/table.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/apache/arrow/go/v15/arrow"
1010
"github.com/cloudquery/plugin-sdk/v4/glob"
11+
"github.com/thoas/go-funk"
1112
)
1213

1314
// TableResolver is the main entry point when a table is sync is called.
@@ -40,6 +41,11 @@ const (
4041
TableColumnChangeTypeAdd
4142
TableColumnChangeTypeUpdate
4243
TableColumnChangeTypeRemove
44+
// These are special cases to help users migrate
45+
// As we remove unique constraints on _cq_id columns this will give destination plugins the ability to auto migrate
46+
TableColumnChangeTypeRemoveUniqueConstraint
47+
// Moving from composite pks to singular PK on _cq_id this will give destination plugins the ability to auto migrate
48+
TableColumnChangeTypeMoveToCQOnly
4349
)
4450

4551
type TableColumnChange struct {
@@ -177,6 +183,10 @@ func (t TableColumnChangeType) String() string {
177183
return "update"
178184
case TableColumnChangeTypeRemove:
179185
return "remove"
186+
case TableColumnChangeTypeRemoveUniqueConstraint:
187+
return "remove_unique_constraint"
188+
case TableColumnChangeTypeMoveToCQOnly:
189+
return "move_to_cq_only"
180190
default:
181191
return "unknown"
182192
}
@@ -190,6 +200,10 @@ func (t TableColumnChange) String() string {
190200
return fmt.Sprintf("column: %s, type: %s, current: %s, previous: %s", t.ColumnName, t.Type, t.Current, t.Previous)
191201
case TableColumnChangeTypeRemove:
192202
return fmt.Sprintf("column: %s, type: %s, previous: %s", t.ColumnName, t.Type, t.Previous)
203+
case TableColumnChangeTypeRemoveUniqueConstraint:
204+
return fmt.Sprintf("column: %s, previous: %s", t.ColumnName, t.Previous)
205+
case TableColumnChangeTypeMoveToCQOnly:
206+
return fmt.Sprintf("multi-column: %s, type: %s, previous: %s", t.ColumnName, t.Type, t.Previous)
193207
default:
194208
return fmt.Sprintf("column: %s, type: %s, current: %s, previous: %s", t.ColumnName, t.Type, t.Current, t.Previous)
195209
}
@@ -443,6 +457,14 @@ func (t *Table) ToArrowSchema() *arrow.Schema {
443457
// GetChanges returns changes between two tables when t is the new one and old is the old one.
444458
func (t *Table) GetChanges(old *Table) []TableColumnChange {
445459
var changes []TableColumnChange
460+
461+
// Special case: Moving from individual pks to singular PK on _cq_id
462+
newPks := t.PrimaryKeys()
463+
if len(newPks) == 1 && newPks[0] == CqIDColumn.Name && !funk.Contains(old.PrimaryKeys(), CqIDColumn.Name) && len(old.PrimaryKeys()) > 0 {
464+
changes = append(changes, TableColumnChange{
465+
Type: TableColumnChangeTypeMoveToCQOnly,
466+
})
467+
}
446468
for _, c := range t.Columns {
447469
otherColumn := old.Columns.Get(c.Name)
448470
// A column was added to the table definition
@@ -454,6 +476,7 @@ func (t *Table) GetChanges(old *Table) []TableColumnChange {
454476
})
455477
continue
456478
}
479+
457480
// Column type or options (e.g. PK, Not Null) changed in the new table definition
458481
if !arrow.TypeEqual(c.Type, otherColumn.Type) || c.NotNull != otherColumn.NotNull || c.PrimaryKey != otherColumn.PrimaryKey {
459482
changes = append(changes, TableColumnChange{
@@ -463,6 +486,15 @@ func (t *Table) GetChanges(old *Table) []TableColumnChange {
463486
Previous: *otherColumn,
464487
})
465488
}
489+
490+
// Unique constraint was removed
491+
if !c.Unique && otherColumn.Unique {
492+
changes = append(changes, TableColumnChange{
493+
Type: TableColumnChangeTypeRemoveUniqueConstraint,
494+
ColumnName: c.Name,
495+
Previous: *otherColumn,
496+
})
497+
}
466498
}
467499
// A column was removed from the table definition
468500
for _, c := range old.Columns {

schema/table_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,181 @@ var testTableGetChangeTestCases = []testTableGetChangeTestCase{
345345
},
346346
},
347347
},
348+
349+
{
350+
name: "move to cq_id as primary key",
351+
target: &Table{
352+
Name: "test",
353+
Columns: []Column{
354+
{
355+
Name: "_cq_id",
356+
Type: types.ExtensionTypes.UUID,
357+
Description: "Internal CQ ID of the row",
358+
NotNull: true,
359+
Unique: true,
360+
PrimaryKey: true,
361+
},
362+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: false},
363+
},
364+
},
365+
source: &Table{
366+
Name: "test",
367+
Columns: []Column{
368+
{
369+
Name: "_cq_id",
370+
Type: types.ExtensionTypes.UUID,
371+
Description: "Internal CQ ID of the row",
372+
NotNull: true,
373+
Unique: true,
374+
},
375+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: true},
376+
},
377+
},
378+
expectedChanges: []TableColumnChange{
379+
{
380+
Type: TableColumnChangeTypeMoveToCQOnly,
381+
},
382+
{
383+
Type: TableColumnChangeTypeUpdate,
384+
ColumnName: "_cq_id",
385+
Current: Column{
386+
Name: "_cq_id",
387+
Type: types.ExtensionTypes.UUID,
388+
Description: "Internal CQ ID of the row",
389+
NotNull: true,
390+
Unique: true,
391+
PrimaryKey: true,
392+
},
393+
Previous: Column{
394+
Name: "_cq_id",
395+
Type: types.ExtensionTypes.UUID,
396+
Description: "Internal CQ ID of the row",
397+
NotNull: true,
398+
Unique: true,
399+
},
400+
},
401+
{
402+
Type: TableColumnChangeTypeUpdate,
403+
ColumnName: "bool",
404+
Current: Column{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: false},
405+
Previous: Column{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: true},
406+
},
407+
},
408+
},
409+
410+
{
411+
name: "move to cq_id as primary key and drop unique constraint",
412+
target: &Table{
413+
Name: "test",
414+
Columns: []Column{
415+
{
416+
Name: "_cq_id",
417+
Type: types.ExtensionTypes.UUID,
418+
Description: "Internal CQ ID of the row",
419+
NotNull: true,
420+
// Unique: true,
421+
PrimaryKey: true,
422+
},
423+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: false},
424+
},
425+
},
426+
source: &Table{
427+
Name: "test",
428+
Columns: []Column{
429+
{
430+
Name: "_cq_id",
431+
Type: types.ExtensionTypes.UUID,
432+
Description: "Internal CQ ID of the row",
433+
NotNull: true,
434+
Unique: true,
435+
},
436+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: true},
437+
},
438+
},
439+
expectedChanges: []TableColumnChange{
440+
{
441+
Type: TableColumnChangeTypeMoveToCQOnly,
442+
},
443+
{
444+
Type: TableColumnChangeTypeUpdate,
445+
ColumnName: "_cq_id",
446+
Current: Column{
447+
Name: "_cq_id",
448+
Type: types.ExtensionTypes.UUID,
449+
Description: "Internal CQ ID of the row",
450+
NotNull: true,
451+
Unique: false,
452+
PrimaryKey: true,
453+
},
454+
Previous: Column{
455+
Name: "_cq_id",
456+
Type: types.ExtensionTypes.UUID,
457+
Description: "Internal CQ ID of the row",
458+
NotNull: true,
459+
Unique: true,
460+
},
461+
},
462+
{
463+
Type: TableColumnChangeTypeRemoveUniqueConstraint,
464+
ColumnName: "_cq_id",
465+
Previous: Column{
466+
Name: "_cq_id",
467+
Type: types.ExtensionTypes.UUID,
468+
Description: "Internal CQ ID of the row",
469+
NotNull: true,
470+
Unique: true,
471+
},
472+
},
473+
{
474+
Type: TableColumnChangeTypeUpdate,
475+
ColumnName: "bool",
476+
Current: Column{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: false},
477+
Previous: Column{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: true},
478+
},
479+
},
480+
},
481+
482+
{
483+
name: "drop unique constraint",
484+
target: &Table{
485+
Name: "test",
486+
Columns: []Column{
487+
{
488+
Name: "_cq_id",
489+
Type: types.ExtensionTypes.UUID,
490+
Description: "Internal CQ ID of the row",
491+
NotNull: true,
492+
},
493+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: true},
494+
},
495+
},
496+
source: &Table{
497+
Name: "test",
498+
Columns: []Column{
499+
{
500+
Name: "_cq_id",
501+
Type: types.ExtensionTypes.UUID,
502+
Description: "Internal CQ ID of the row",
503+
NotNull: true,
504+
Unique: true,
505+
},
506+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, PrimaryKey: true},
507+
},
508+
},
509+
expectedChanges: []TableColumnChange{
510+
{
511+
Type: TableColumnChangeTypeRemoveUniqueConstraint,
512+
ColumnName: "_cq_id",
513+
Previous: Column{
514+
Name: "_cq_id",
515+
Type: types.ExtensionTypes.UUID,
516+
Description: "Internal CQ ID of the row",
517+
NotNull: true,
518+
Unique: true,
519+
},
520+
},
521+
},
522+
},
348523
}
349524

350525
func TestTableGetChanges(t *testing.T) {

0 commit comments

Comments
 (0)