Skip to content

Commit 6ca3d19

Browse files
loganintechggreer
andauthored
Create a "diff" sync entry for the items added between syncs (#366)
* Working on diff * Getting there * Nearly there? * I think it'll work * I think e2e but unsure * Feels like it should be working but it's not saving to disk and idk why * twerks * Renaming some things * Diffs should be considered partial syncs * Comments * Export target none * silly wabbit coding is for engineers * DB updated at the end of the function * Rebase and fix compiler error. --------- Co-authored-by: Geoff Greer <geoff@greer.fm>
1 parent 1e198b9 commit 6ca3d19

File tree

12 files changed

+825
-262
lines changed

12 files changed

+825
-262
lines changed

pb/c1/connectorapi/baton/v1/baton.pb.go

Lines changed: 349 additions & 253 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pb/c1/connectorapi/baton/v1/baton.pb.validate.go

Lines changed: 181 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cli/commands.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,15 @@ func MakeMainCommand[T field.Configurable](
224224
connectorrunner.WithTargetedSyncResourceIDs(v.GetStringSlice("sync-resources")),
225225
connectorrunner.WithOnDemandSync(v.GetString("file")),
226226
)
227+
case v.GetBool("diff-syncs"):
228+
opts = append(opts,
229+
connectorrunner.WithDiffSyncs(
230+
v.GetString("file"),
231+
v.GetString("base-sync-id"),
232+
v.GetString("applied-sync-id"),
233+
),
234+
)
235+
227236
default:
228237
opts = append(opts, connectorrunner.WithOnDemandSync(v.GetString("file")))
229238
}

pkg/connectorrunner/runner.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ type rotateCredentialsConfig struct {
282282
type eventStreamConfig struct {
283283
}
284284

285+
type syncDifferConfig struct {
286+
baseSyncID string
287+
appliedSyncID string
288+
}
289+
285290
type runnerConfig struct {
286291
rlCfg *ratelimitV1.RateLimiterConfig
287292
rlDescriptors []*ratelimitV1.RateLimitDescriptors_Entry
@@ -303,6 +308,7 @@ type runnerConfig struct {
303308
bulkCreateTicketConfig *bulkCreateTicketConfig
304309
listTicketSchemasConfig *listTicketSchemasConfig
305310
getTicketConfig *getTicketConfig
311+
syncDifferConfig *syncDifferConfig
306312
skipFullSync bool
307313
targetedSyncResourceIDs []string
308314
externalResourceC1Z string
@@ -555,6 +561,18 @@ func WithExternalResourceEntitlementFilter(entitlementId string) Option {
555561
}
556562
}
557563

564+
func WithDiffSyncs(c1zPath string, baseSyncID string, newSyncID string) Option {
565+
return func(ctx context.Context, cfg *runnerConfig) error {
566+
cfg.onDemand = true
567+
cfg.c1zPath = c1zPath
568+
cfg.syncDifferConfig = &syncDifferConfig{
569+
baseSyncID: baseSyncID,
570+
appliedSyncID: newSyncID,
571+
}
572+
return nil
573+
}
574+
}
575+
558576
// NewConnectorRunner creates a new connector runner.
559577
func NewConnectorRunner(ctx context.Context, c types.ConnectorServer, opts ...Option) (*connectorRunner, error) {
560578
runner := &connectorRunner{}
@@ -635,6 +653,8 @@ func NewConnectorRunner(ctx context.Context, c types.ConnectorServer, opts ...Op
635653
tm = local.NewGetTicket(ctx, cfg.getTicketConfig.ticketID)
636654
case cfg.bulkCreateTicketConfig != nil:
637655
tm = local.NewBulkTicket(ctx, cfg.bulkCreateTicketConfig.templatePath)
656+
case cfg.syncDifferConfig != nil:
657+
tm = local.NewDiffer(ctx, cfg.c1zPath, cfg.syncDifferConfig.baseSyncID, cfg.syncDifferConfig.appliedSyncID)
638658
default:
639659
tm, err = local.NewSyncer(ctx, cfg.c1zPath,
640660
local.WithTmpDir(cfg.tempDir),

pkg/dotc1z/diff.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package dotc1z
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/doug-martin/goqu/v9"
9+
"github.com/segmentio/ksuid"
10+
)
11+
12+
func (c *C1File) GenerateSyncDiff(ctx context.Context, baseSyncID string, appliedSyncID string) (string, error) {
13+
// Validate that both sync runs exist
14+
baseSync, err := c.getSync(ctx, baseSyncID)
15+
if err != nil {
16+
return "", err
17+
}
18+
if baseSync == nil {
19+
return "", fmt.Errorf("generate-diff: base sync not found")
20+
}
21+
22+
newSync, err := c.getSync(ctx, appliedSyncID)
23+
if err != nil {
24+
return "", err
25+
}
26+
if newSync == nil {
27+
return "", fmt.Errorf("generate-diff: new sync not found")
28+
}
29+
30+
// Generate a new unique ID for the diff sync
31+
diffSyncID := ksuid.New().String()
32+
33+
if err := c.insertSyncRun(ctx, diffSyncID, SyncTypePartial, baseSyncID); err != nil {
34+
return "", err
35+
}
36+
37+
for _, t := range allTableDescriptors {
38+
if strings.Contains(t.Name(), syncRunsTableName) {
39+
continue
40+
}
41+
42+
q, args, err := c.diffTableQuery(t, baseSyncID, appliedSyncID, diffSyncID)
43+
if err != nil {
44+
return "", err
45+
}
46+
_, err = c.db.ExecContext(ctx, q, args...)
47+
if err != nil {
48+
return "", err
49+
}
50+
c.dbUpdated = true
51+
}
52+
53+
if err := c.endSyncRun(ctx, diffSyncID); err != nil {
54+
return "", err
55+
}
56+
57+
return diffSyncID, nil
58+
}
59+
60+
func (c *C1File) diffTableQuery(table tableDescriptor, baseSyncID, appliedSyncID, newSyncID string) (string, []any, error) {
61+
// Define the columns to select based on the table name
62+
columns := []interface{}{
63+
"external_id",
64+
"data",
65+
"sync_id",
66+
"discovered_at",
67+
}
68+
69+
tableName := table.Name()
70+
// Add table-specific columns
71+
switch {
72+
case strings.Contains(tableName, resourcesTableName):
73+
columns = append(columns, "resource_type_id", "parent_resource_type_id", "parent_resource_id")
74+
case strings.Contains(tableName, resourceTypesTableName):
75+
// Nothing new to add here
76+
case strings.Contains(tableName, grantsTableName):
77+
columns = append(columns, "resource_type_id", "resource_id", "entitlement_id", "principal_resource_type_id", "principal_resource_id")
78+
case strings.Contains(tableName, entitlementsTableName):
79+
columns = append(columns, "resource_type_id", "resource_id")
80+
case strings.Contains(tableName, assetsTableName):
81+
columns = append(columns, "content_type")
82+
}
83+
84+
// Build the subquery to find external_ids in the base sync
85+
subquery := c.db.Select("external_id").
86+
From(tableName).
87+
Where(goqu.C("sync_id").Eq(baseSyncID))
88+
89+
queryColumns := []interface{}{}
90+
for _, col := range columns {
91+
if col == "sync_id" {
92+
queryColumns = append(queryColumns, goqu.L(fmt.Sprintf("'%s' as sync_id", newSyncID)))
93+
continue
94+
}
95+
queryColumns = append(queryColumns, col)
96+
}
97+
98+
// Build the main query to select records from newSyncID that don't exist in baseSyncID
99+
query := c.db.Insert(tableName).
100+
Cols(columns...).
101+
Prepared(true).
102+
FromQuery(
103+
c.db.Select(queryColumns...).
104+
From(tableName).
105+
Where(
106+
goqu.C("sync_id").Eq(appliedSyncID),
107+
goqu.C("external_id").NotIn(subquery),
108+
),
109+
)
110+
111+
// Generate the SQL and args
112+
return query.ToSQL()
113+
}

0 commit comments

Comments
 (0)