Skip to content

Commit 837c5f3

Browse files
authored
fix: DeleteStale feature (#269)
#### Summary <!-- 🎉 Thank you for making CloudQuery awesome by submitting a PR 🎉 --> testing matrix (two syncs, different versions for first and second sync ) ![image](https://user-images.githubusercontent.com/37939765/194821929-155f2b6c-6862-4e36-9036-eb2d2d6299fc.png) <!-- Explain what problem this PR addresses --> --- Use the following steps to ensure your PR is ready to be reviewed - [ ] Read the [contribution guidelines](../blob/main/CONTRIBUTING.md) 🧑‍🎓 - [ ] Run `go fmt` to format your code 🖊 - [ ] Lint your changes via `golangci-lint run` 🚨 (install golangci-lint [here](https://golangci-lint.run/usage/install/#local-installation)) - [ ] Update or add tests 🧪 - [ ] Ensure the status checks below are successful ✅
1 parent 7717d6f commit 837c5f3

File tree

7 files changed

+68
-43
lines changed

7 files changed

+68
-43
lines changed

plugins/.snapshots/TestGenerateSourcePluginDocs-relation_table.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ This table depends on [`test_table`](test_table.md).
1010
## Columns
1111
| Name | Type |
1212
| ------------- | ------------- |
13-
|_cq_id (PK)|UUID|
14-
|_cq_parent_id|UUID|
1513
|_cq_source_name|String|
1614
|_cq_sync_time|Timestamp|
15+
|_cq_id (PK)|UUID|
16+
|_cq_parent_id|UUID|
1717
|string_col|String|

plugins/.snapshots/TestGenerateSourcePluginDocs-test_table.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ The following tables depend on `test_table`:
1111
## Columns
1212
| Name | Type |
1313
| ------------- | ------------- |
14-
|_cq_id|UUID|
15-
|_cq_parent_id|UUID|
1614
|_cq_source_name|String|
1715
|_cq_sync_time|Timestamp|
16+
|_cq_id|UUID|
17+
|_cq_parent_id|UUID|
1818
|int_col|Int|
1919
|id_col (PK)|Int|
2020
|id_col2 (PK)|Int|

plugins/destination.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,25 +73,23 @@ func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spe
7373

7474
// we implement all DestinationClient functions so we can hook into pre-post behavior
7575
func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) error {
76+
SetDestinationManagedCqColumns(tables)
77+
7678
if p.client == nil {
7779
return fmt.Errorf("destination client not initialized")
7880
}
7981
p.tables = tables
8082
return p.client.Migrate(ctx, tables)
8183
}
8284

83-
func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary {
85+
func (p *DestinationPlugin) Write(ctx context.Context, sourceName string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary {
8486
if p.client == nil {
8587
return nil
8688
}
8789
summary := WriteSummary{}
8890
for r := range res {
89-
if _, ok := r.Data[schema.CqSourceName.Name]; ok {
90-
r.Data[schema.CqSourceName.Name] = source
91-
}
92-
if _, ok := r.Data[schema.CqSyncTime.Name]; ok {
93-
r.Data[schema.CqSyncTime.Name] = syncTime
94-
}
91+
r.Data[schema.CqSourceNameColumn.Name] = sourceName
92+
r.Data[schema.CqSyncTimeColumn.Name] = syncTime
9593
err := p.client.Write(ctx, r.TableName, r.Data)
9694
if err != nil {
9795
summary.FailedWrites++
@@ -101,7 +99,7 @@ func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime t
10199
}
102100
}
103101
if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale {
104-
failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), source, syncTime)
102+
failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), sourceName, syncTime)
105103
summary.FailedDeletes = failedDeletes
106104
}
107105
return &summary
@@ -127,3 +125,13 @@ func (p *DestinationPlugin) Close(ctx context.Context) error {
127125
}
128126
return p.client.Close(ctx)
129127
}
128+
129+
// Overwrites or adds the CQ columns that are managed by the destination plugins (_cq_sync_time, _cq_source_name).
130+
func SetDestinationManagedCqColumns(tables []*schema.Table) {
131+
for _, table := range tables {
132+
table.OverwriteOrAddColumn(&schema.CqSyncTimeColumn)
133+
table.OverwriteOrAddColumn(&schema.CqSourceNameColumn)
134+
135+
SetDestinationManagedCqColumns(table.Relations)
136+
}
137+
}

plugins/source.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func addInternalColumns(tables []*schema.Table) {
3838
if len(table.PrimaryKeys()) == 0 {
3939
cqID.CreationOptions.PrimaryKey = true
4040
}
41-
table.Columns = append([]schema.Column{cqID, schema.CqParentIDColumn, schema.CqSourceName, schema.CqSyncTime}, table.Columns...)
41+
table.Columns = append([]schema.Column{cqID, schema.CqParentIDColumn}, table.Columns...)
4242
addInternalColumns(table.Relations)
4343
}
4444
}

plugins/source_docs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ func (p *SourcePlugin) GenerateSourcePluginDocs(dir string) error {
1919
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
2020
return err
2121
}
22+
23+
SetDestinationManagedCqColumns(p.Tables())
24+
2225
for _, table := range p.Tables() {
2326
if err := renderAllTables(table, dir); err != nil {
2427
return err

schema/meta.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,32 @@ type Meta struct {
1717
FetchID string `json:"fetch_id,omitempty"`
1818
}
1919

20-
const FetchIDMetaKey = "cq_fetch_id"
20+
// These columns are managed and populated by the source plugins
21+
var CqIDColumn = Column{
22+
Name: "_cq_id",
23+
Type: TypeUUID,
24+
Description: "Internal CQ ID of the row",
25+
Resolver: cqUUIDResolver(),
26+
}
27+
var CqParentIDColumn = Column{
28+
Name: "_cq_parent_id",
29+
Type: TypeUUID,
30+
Description: "Internal CQ ID of the parent row",
31+
Resolver: parentCqUUIDResolver(),
32+
IgnoreInTests: true,
33+
}
2134

22-
var CqIDColumn = Column{Name: "_cq_id", Type: TypeUUID, Description: "Internal CQ ID of the row", Resolver: cqUUIDResolver()}
23-
var CqParentIDColumn = Column{Name: "_cq_parent_id", Type: TypeUUID, Description: "Internal CQ ID of the parent row", Resolver: parentCqUUIDResolver(), IgnoreInTests: true}
24-
var CqSyncTime = Column{Name: "_cq_sync_time", Type: TypeTimestamp, Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)", IgnoreInTests: true}
25-
var CqSourceName = Column{Name: "_cq_source_name", Type: TypeString, Description: "Internal CQ row that references the source plugin name data was retrieved", IgnoreInTests: true}
35+
// These columns are managed and populated by the destination plugin.
36+
var CqSyncTimeColumn = Column{
37+
Name: "_cq_sync_time",
38+
Type: TypeTimestamp,
39+
Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)",
40+
}
41+
var CqSourceNameColumn = Column{
42+
Name: "_cq_source_name",
43+
Type: TypeString,
44+
Description: "Internal CQ row that references the source plugin name data was retrieved",
45+
}
2646

2747
func cqUUIDResolver() ColumnResolver {
2848
return func(_ context.Context, _ ClientMeta, r *Resource, c Column) error {

schema/table.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ type Table struct {
6464

6565
// Parent is the parent table in case this table is called via parent table (i.e. relation)
6666
Parent *Table `json:"-"`
67-
68-
columnsMap map[string]int
6967
}
7068

7169
func (s *SyncSummary) Merge(other SyncSummary) {
@@ -102,7 +100,7 @@ func (tt Tables) ValidateDuplicateTables() error {
102100
return nil
103101
}
104102

105-
func (t Table) ValidateDuplicateColumns() error {
103+
func (t *Table) ValidateDuplicateColumns() error {
106104
columns := make(map[string]bool, len(t.Columns))
107105
for _, c := range t.Columns {
108106
if _, ok := columns[c.Name]; ok {
@@ -118,7 +116,7 @@ func (t Table) ValidateDuplicateColumns() error {
118116
return nil
119117
}
120118

121-
func (t Table) Column(name string) *Column {
119+
func (t *Table) Column(name string) *Column {
122120
for _, c := range t.Columns {
123121
if c.Name == name {
124122
return &c
@@ -127,7 +125,19 @@ func (t Table) Column(name string) *Column {
127125
return nil
128126
}
129127

130-
func (t Table) PrimaryKeys() []string {
128+
// If the column with the same name exists, overwrites it.
129+
// Otherwise, adds the column to the beginning of the table.
130+
func (t *Table) OverwriteOrAddColumn(column *Column) {
131+
for i, c := range t.Columns {
132+
if c.Name == column.Name {
133+
t.Columns[i] = *column
134+
return
135+
}
136+
}
137+
t.Columns = append([]Column{*column}, t.Columns...)
138+
}
139+
140+
func (t *Table) PrimaryKeys() []string {
131141
var primaryKeys []string
132142
for _, c := range t.Columns {
133143
if c.CreationOptions.PrimaryKey {
@@ -138,23 +148,7 @@ func (t Table) PrimaryKeys() []string {
138148
return primaryKeys
139149
}
140150

141-
func (t Table) ColumnIndex(name string) int {
142-
var once sync.Once
143-
once.Do(func() {
144-
if t.columnsMap == nil {
145-
t.columnsMap = make(map[string]int)
146-
for i, c := range t.Columns {
147-
t.columnsMap[c.Name] = i
148-
}
149-
}
150-
})
151-
if index, ok := t.columnsMap[name]; ok {
152-
return index
153-
}
154-
return -1
155-
}
156-
157-
func (t Table) TableNames() []string {
151+
func (t *Table) TableNames() []string {
158152
ret := []string{t.Name}
159153
for _, rel := range t.Relations {
160154
ret = append(ret, rel.TableNames()...)
@@ -163,7 +157,7 @@ func (t Table) TableNames() []string {
163157
}
164158

165159
// Call the table resolver with with all of it's relation for every reolved resource
166-
func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) {
160+
func (t *Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) {
167161
tableStartTime := time.Now()
168162
meta.Logger().Info().Str("table", t.Name).Msg("table resolver started")
169163

@@ -225,8 +219,8 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r
225219
return summary
226220
}
227221

228-
func (t Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resource, item interface{}, resolvedResources chan<- *Resource) (summary SyncSummary) {
229-
resource := NewResourceData(&t, parent, item)
222+
func (t *Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resource, item interface{}, resolvedResources chan<- *Resource) (summary SyncSummary) {
223+
resource := NewResourceData(t, parent, item)
230224
objectStartTime := time.Now()
231225
csr := caser.New()
232226
meta.Logger().Info().Str("table", t.Name).Msg("object resolver started")

0 commit comments

Comments
 (0)