Skip to content

Commit 7512e29

Browse files
authored
feat(schema): Embed column creation options (#869)
1 parent b5c76bb commit 7512e29

File tree

10 files changed

+100
-134
lines changed

10 files changed

+100
-134
lines changed

internal/servers/destination/v0/destinations.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error {
164164
func setCQIDAsPrimaryKeysForTables(tables schema.Tables) {
165165
for _, table := range tables {
166166
for i, col := range table.Columns {
167-
table.Columns[i].CreationOptions.PrimaryKey = col.Name == schema.CqIDColumn.Name
167+
table.Columns[i].PrimaryKey = col.Name == schema.CqIDColumn.Name
168168
}
169169
setCQIDAsPrimaryKeysForTables(table.Relations)
170170
}
@@ -175,8 +175,8 @@ func SetDestinationManagedCqColumns(tables []*schema.Table) {
175175
for _, table := range tables {
176176
for i := range table.Columns {
177177
if table.Columns[i].Name == schema.CqIDColumn.Name {
178-
table.Columns[i].CreationOptions.Unique = true
179-
table.Columns[i].CreationOptions.NotNull = true
178+
table.Columns[i].Unique = true
179+
table.Columns[i].NotNull = true
180180
}
181181
}
182182
table.OverwriteOrAddColumn(&schema.CqSyncTimeColumn)

internal/servers/destination/v0/schemav2tov3.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,14 @@ func ColumnsV2ToV3(columns []schemav2.Column) []schema.Column {
4444

4545
func ColumnV2ToV3(column schemav2.Column) schema.Column {
4646
return schema.Column{
47-
Name: column.Name,
48-
Description: column.Description,
49-
Type: TypeV2ToV3(column.Type),
50-
CreationOptions: schema.ColumnCreationOptions{
51-
NotNull: column.CreationOptions.NotNull,
52-
Unique: column.CreationOptions.Unique,
53-
PrimaryKey: column.CreationOptions.PrimaryKey,
54-
},
55-
IgnoreInTests: column.IgnoreInTests,
47+
Name: column.Name,
48+
Description: column.Description,
49+
Type: TypeV2ToV3(column.Type),
50+
NotNull: column.CreationOptions.NotNull,
51+
Unique: column.CreationOptions.Unique,
52+
PrimaryKey: column.CreationOptions.PrimaryKey,
53+
IncrementalKey: column.CreationOptions.IncrementalKey,
54+
IgnoreInTests: column.IgnoreInTests,
5655
}
5756
}
5857

plugins/destination/plugin.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -293,18 +293,18 @@ func (p *Plugin) Close(ctx context.Context) error {
293293

294294
func checkDestinationColumns(tables schema.Tables) error {
295295
for _, table := range tables {
296-
if table.Columns.Index(schema.CqSourceNameField.Name) == -1 {
297-
return fmt.Errorf("table %s is missing column %s. please consider upgrading source plugin", table.Name, schema.CqSourceNameField.Name)
296+
if table.Columns.Index(schema.CqSourceNameColumn.Name) == -1 {
297+
return fmt.Errorf("table %s is missing column %s. please consider upgrading source plugin", table.Name, schema.CqSourceNameColumn.Name)
298298
}
299299
if table.Columns.Index(schema.CqSyncTimeColumn.Name) == -1 {
300-
return fmt.Errorf("table %s is missing column %s. please consider upgrading source plugin", table.Name, schema.CqSourceNameField.Name)
300+
return fmt.Errorf("table %s is missing column %s. please consider upgrading source plugin", table.Name, schema.CqSourceNameColumn.Name)
301301
}
302-
field := table.Columns.Get(schema.CqIDColumn.Name)
303-
if field != nil {
304-
if !field.CreationOptions.NotNull {
302+
column := table.Columns.Get(schema.CqIDColumn.Name)
303+
if column != nil {
304+
if !column.NotNull {
305305
return fmt.Errorf("column %s.%s cannot be nullable. please consider upgrading source plugin", table.Name, schema.CqIDColumn.Name)
306306
}
307-
if !field.CreationOptions.Unique {
307+
if !column.Unique {
308308
return fmt.Errorf("column %s.%s must be unique. please consider upgrading source plugin", table.Name, schema.CqIDColumn.Name)
309309
}
310310
}

plugins/destination/plugin_testing_migrate.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
147147
schema.CqSyncTimeColumn,
148148
schema.CqIDColumn,
149149
{Name: "id", Type: types.ExtensionTypes.UUID},
150-
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, CreationOptions: schema.ColumnCreationOptions{NotNull: true}},
150+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, NotNull: true},
151151
}}
152152
p := newPlugin()
153153
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumnNotNull); err != nil {
@@ -170,8 +170,8 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
170170
schema.CqSourceNameColumn,
171171
schema.CqSyncTimeColumn,
172172
schema.CqIDColumn,
173-
{Name: "id", Type: types.ExtensionTypes.UUID, CreationOptions: schema.ColumnCreationOptions{NotNull: true}},
174-
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, CreationOptions: schema.ColumnCreationOptions{NotNull: true}},
173+
{Name: "id", Type: types.ExtensionTypes.UUID, NotNull: true},
174+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, NotNull: true},
175175
}}
176176
target := &schema.Table{
177177
Columns: schema.ColumnList{
@@ -203,7 +203,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
203203
schema.CqSyncTimeColumn,
204204
schema.CqIDColumn,
205205
{Name: "id", Type: types.ExtensionTypes.UUID},
206-
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, CreationOptions: schema.ColumnCreationOptions{NotNull: true}},
206+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, NotNull: true},
207207
},
208208
}
209209
target := &schema.Table{
@@ -212,7 +212,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
212212
schema.CqSourceNameColumn,
213213
schema.CqSyncTimeColumn,
214214
schema.CqIDColumn,
215-
{Name: "id", Type: types.ExtensionTypes.UUID, CreationOptions: schema.ColumnCreationOptions{NotNull: true}},
215+
{Name: "id", Type: types.ExtensionTypes.UUID, NotNull: true},
216216
}}
217217

218218
p := newPlugin()
@@ -237,7 +237,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
237237
schema.CqSyncTimeColumn,
238238
schema.CqIDColumn,
239239
{Name: "id", Type: types.ExtensionTypes.UUID},
240-
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, CreationOptions: schema.ColumnCreationOptions{NotNull: true}},
240+
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, NotNull: true},
241241
}}
242242
target := &schema.Table{
243243
Name: tableName,
@@ -246,7 +246,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
246246
schema.CqSyncTimeColumn,
247247
schema.CqIDColumn,
248248
{Name: "id", Type: types.ExtensionTypes.UUID},
249-
{Name: "bool", Type: arrow.BinaryTypes.String, CreationOptions: schema.ColumnCreationOptions{NotNull: true}},
249+
{Name: "bool", Type: arrow.BinaryTypes.String, NotNull: true},
250250
}}
251251

252252
p := newPlugin()

schema/column.go

Lines changed: 44 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,6 @@ type ColumnList []Column
1414
// resource holds the current row we are resolving the column for.
1515
type ColumnResolver func(ctx context.Context, meta ClientMeta, resource *Resource, c Column) error
1616

17-
// ColumnCreationOptions allow modification of how column is defined when table is created
18-
type ColumnCreationOptions struct {
19-
PrimaryKey bool
20-
NotNull bool
21-
// IncrementalKey is a flag that indicates if the column is used as part of an incremental key.
22-
// It is mainly used for documentation purposes, but may also be used as part of ensuring that
23-
// migrations are done correctly.
24-
IncrementalKey bool
25-
Unique bool
26-
}
27-
2817
// Column definition for Table
2918
type Column struct {
3019
// Name of column
@@ -35,62 +24,68 @@ type Column struct {
3524
Description string
3625
// Column Resolver allows to set your own data for a column; this can be an API call, setting multiple embedded values, etc
3726
Resolver ColumnResolver
38-
// Creation options allow modifying how column is defined when table is created
39-
CreationOptions ColumnCreationOptions
27+
4028
// IgnoreInTests is used to skip verifying the column is non-nil in integration tests.
4129
// By default, integration tests perform a fetch for all resources in cloudquery's test account, and
4230
// verify all columns are non-nil.
4331
// If IgnoreInTests is true, verification is skipped for this column.
4432
// Used when it is hard to create a reproducible environment with this column being non-nil (e.g. various error columns).
4533
IgnoreInTests bool
34+
35+
// PrimaryKey requires the destinations supporting this to include this column into the primary key
36+
PrimaryKey bool
37+
// NotNull requires the destinations supporting this to mark this column as non-nullable
38+
NotNull bool
39+
// IncrementalKey is a flag that indicates if the column is used as part of an incremental key.
40+
// It is mainly used for documentation purposes, but may also be used as part of ensuring that
41+
// migrations are done correctly.
42+
IncrementalKey bool
43+
// Unique requires the destinations supporting this to mark this column as unique
44+
Unique bool
4645
}
4746

4847
// NewColumnFromArrowField creates a new Column from an arrow.Field
4948
// arrow.Field is a low-level representation of a CloudQuery column
5049
// that can be sent over the wire in a cross-language way.
5150
func NewColumnFromArrowField(f arrow.Field) Column {
52-
creationOptions := ColumnCreationOptions{
51+
column := Column{
52+
Name: f.Name,
53+
Type: f.Type,
5354
NotNull: !f.Nullable,
5455
}
55-
if v, ok := f.Metadata.GetValue(MetadataPrimaryKey); ok {
56-
if v == MetadataTrue {
57-
creationOptions.PrimaryKey = true
58-
} else {
59-
creationOptions.PrimaryKey = false
60-
}
61-
}
6256

63-
if v, ok := f.Metadata.GetValue(MetadataUnique); ok {
64-
if v == MetadataTrue {
65-
creationOptions.Unique = true
66-
} else {
67-
creationOptions.Unique = false
68-
}
69-
}
70-
return Column{
71-
Name: f.Name,
72-
Type: f.Type,
73-
CreationOptions: creationOptions,
74-
}
57+
v, ok := f.Metadata.GetValue(MetadataPrimaryKey)
58+
column.PrimaryKey = ok && v == MetadataTrue
59+
60+
v, ok = f.Metadata.GetValue(MetadataUnique)
61+
column.Unique = ok && v == MetadataTrue
62+
63+
v, ok = f.Metadata.GetValue(MetadataIncremental)
64+
column.IncrementalKey = ok && v == MetadataTrue
65+
66+
return column
7567
}
7668

7769
func (c Column) ToArrowField() arrow.Field {
78-
mdKV := map[string]string{}
79-
if c.CreationOptions.PrimaryKey {
70+
mdKV := map[string]string{
71+
MetadataPrimaryKey: MetadataFalse,
72+
MetadataUnique: MetadataFalse,
73+
MetadataIncremental: MetadataFalse,
74+
}
75+
if c.PrimaryKey {
8076
mdKV[MetadataPrimaryKey] = MetadataTrue
81-
} else {
82-
mdKV[MetadataPrimaryKey] = MetadataFalse
8377
}
84-
if c.CreationOptions.Unique {
78+
if c.Unique {
8579
mdKV[MetadataUnique] = MetadataTrue
86-
} else {
87-
mdKV[MetadataUnique] = MetadataFalse
80+
}
81+
if c.IncrementalKey {
82+
mdKV[MetadataIncremental] = MetadataTrue
8883
}
8984

9085
return arrow.Field{
9186
Name: c.Name,
9287
Type: c.Type,
93-
Nullable: !c.CreationOptions.NotNull,
88+
Nullable: !c.NotNull,
9489
Metadata: arrow.MetadataFrom(mdKV),
9590
}
9691
}
@@ -100,12 +95,18 @@ func (c Column) String() string {
10095
sb.WriteString(c.Name)
10196
sb.WriteString(":")
10297
sb.WriteString(c.Type.String())
103-
if c.CreationOptions.PrimaryKey {
98+
if c.PrimaryKey {
10499
sb.WriteString(":PK")
105100
}
106-
if c.CreationOptions.NotNull {
101+
if c.NotNull {
107102
sb.WriteString(":NotNull")
108103
}
104+
if c.Unique {
105+
sb.WriteString(":Unique")
106+
}
107+
if c.IncrementalKey {
108+
sb.WriteString(":IncrementalKey")
109+
}
109110
return sb.String()
110111
}
111112

schema/meta.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@ var CqIDColumn = Column{
1717
Name: "_cq_id",
1818
Type: types.ExtensionTypes.UUID,
1919
Description: "Internal CQ ID of the row",
20-
CreationOptions: ColumnCreationOptions{
21-
NotNull: true,
22-
Unique: true,
23-
},
20+
NotNull: true,
21+
Unique: true,
2422
}
2523
var CqParentIDColumn = Column{
2624
Name: "_cq_parent_id",
@@ -42,22 +40,6 @@ var CqSourceNameColumn = Column{
4240
Description: "Internal CQ row that references the source plugin name data was retrieved",
4341
}
4442

45-
var CqIDField = arrow.Field{
46-
Name: "_cq_id",
47-
Type: types.ExtensionTypes.UUID,
48-
Metadata: arrow.MetadataFrom(map[string]string{
49-
MetadataUnique: MetadataTrue,
50-
}),
51-
}
52-
var CqSyncTimeField = arrow.Field{
53-
Name: "_cq_sync_time",
54-
Type: arrow.FixedWidthTypes.Timestamp_us,
55-
}
56-
var CqSourceNameField = arrow.Field{
57-
Name: "_cq_source_name",
58-
Type: arrow.BinaryTypes.String,
59-
}
60-
6143
func parentCqUUIDResolver() ColumnResolver {
6244
return func(_ context.Context, _ ClientMeta, r *Resource, c Column) error {
6345
if r.Parent == nil {

schema/table.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func (t *Table) ValidateName() error {
333333
func (t *Table) PrimaryKeysIndexes() []int {
334334
var primaryKeys []int
335335
for i, c := range t.Columns {
336-
if c.CreationOptions.PrimaryKey {
336+
if c.PrimaryKey {
337337
primaryKeys = append(primaryKeys, i)
338338
}
339339
}
@@ -347,23 +347,7 @@ func (t *Table) ToArrowSchema() *arrow.Schema {
347347
MetadataTableName: t.Name,
348348
})
349349
for i, c := range t.Columns {
350-
fieldMdKv := map[string]string{}
351-
if c.CreationOptions.PrimaryKey {
352-
fieldMdKv[MetadataPrimaryKey] = MetadataTrue
353-
} else {
354-
fieldMdKv[MetadataPrimaryKey] = MetadataFalse
355-
}
356-
if c.CreationOptions.Unique {
357-
fieldMdKv[MetadataUnique] = MetadataTrue
358-
} else {
359-
fieldMdKv[MetadataUnique] = MetadataFalse
360-
}
361-
fields[i] = arrow.Field{
362-
Name: c.Name,
363-
Type: c.Type,
364-
Nullable: !c.CreationOptions.NotNull,
365-
Metadata: arrow.MetadataFrom(fieldMdKv),
366-
}
350+
fields[i] = c.ToArrowField()
367351
}
368352
return arrow.NewSchema(fields, &schemaMd)
369353
}
@@ -383,7 +367,7 @@ func (t *Table) GetChanges(old *Table) []TableColumnChange {
383367
continue
384368
}
385369
// Column type or options (e.g. PK, Not Null) changed in the new table definition
386-
if c.Type != otherColumn.Type || c.CreationOptions.NotNull != otherColumn.CreationOptions.NotNull || c.CreationOptions.PrimaryKey != otherColumn.CreationOptions.PrimaryKey {
370+
if c.Type != otherColumn.Type || c.NotNull != otherColumn.NotNull || c.PrimaryKey != otherColumn.PrimaryKey {
387371
changes = append(changes, TableColumnChange{
388372
Type: TableColumnChangeTypeUpdate,
389373
ColumnName: c.Name,
@@ -454,7 +438,7 @@ func (t *Table) OverwriteOrAddColumn(column *Column) {
454438
func (t *Table) PrimaryKeys() []string {
455439
var primaryKeys []string
456440
for _, c := range t.Columns {
457-
if c.CreationOptions.PrimaryKey {
441+
if c.PrimaryKey {
458442
primaryKeys = append(primaryKeys, c.Name)
459443
}
460444
}
@@ -465,7 +449,7 @@ func (t *Table) PrimaryKeys() []string {
465449
func (t *Table) IncrementalKeys() []string {
466450
var incrementalKeys []string
467451
for _, c := range t.Columns {
468-
if c.CreationOptions.IncrementalKey {
452+
if c.IncrementalKey {
469453
incrementalKeys = append(incrementalKeys, c.Name)
470454
}
471455
}

schema/testdata.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ func TestSourceTable(name string) *Table {
2121
CqIDColumn,
2222
CqParentIDColumn,
2323
{
24-
Name: "uuid_pk",
25-
Type: types.ExtensionTypes.UUID,
26-
CreationOptions: ColumnCreationOptions{PrimaryKey: true},
24+
Name: "uuid_pk",
25+
Type: types.ExtensionTypes.UUID,
26+
PrimaryKey: true,
2727
},
2828
{
29-
Name: "string_pk",
30-
Type: arrow.BinaryTypes.String,
31-
CreationOptions: ColumnCreationOptions{PrimaryKey: true},
29+
Name: "string_pk",
30+
Type: arrow.BinaryTypes.String,
31+
PrimaryKey: true,
3232
},
3333
{
3434
Name: "bool",
@@ -152,7 +152,7 @@ func GenTestData(table *Table, opts GenTestDataOptions) []arrow.Record {
152152
nullRow := j%2 == 1
153153
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
154154
for i, c := range table.Columns {
155-
if nullRow && !c.CreationOptions.NotNull && !c.CreationOptions.PrimaryKey &&
155+
if nullRow && !c.NotNull && !c.PrimaryKey &&
156156
c.Name != CqSourceNameColumn.Name &&
157157
c.Name != CqSyncTimeColumn.Name &&
158158
c.Name != CqIDColumn.Name &&

transformers/struct.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (t *structTransformer) addColumnFromField(field reflect.StructField, parent
278278
// use path to allow the following
279279
// 1. Don't duplicate the PK fields if the unwrapped struct contains a fields with the same name
280280
// 2. Allow specifying the nested unwrapped field as part of the PK.
281-
column.CreationOptions.PrimaryKey = true
281+
column.PrimaryKey = true
282282
t.pkFieldsFound = append(t.pkFieldsFound, pk)
283283
}
284284
}

0 commit comments

Comments
 (0)