Skip to content

Commit 502d306

Browse files
authored
Compaction speedup: Reuse c1file for compaction steps. (#611)
* Compaction speedup: Reuse c1file for compaction steps. * Don't drop unique constraint on external_id + sync_id. We need that for updating instead of inserting another row when compacting. * Improve benchmark to compact 10 partial syncs and one full sync. * Fix incorrect error message. * Benchmark: Clean up immediately, not at the end of all benchmarks. * Remove aliased variable in test. * Set sync ended_at to be the later of the two compacted syncs ended_ats. * Compaction benchmark: More realistic entitlement/grant counts. * Compaction: Create a new sync instead of copying the first sync and compacting everything on it. This is slower, but it means the compacted sync does not have the same sync ID as any other sync. * Compaction: Remove deleting/recreating indexes. Now that we're reusing the same c1z for all compaction, it's no longer faster. * Cleanup: Tweak a few pragmas and comments in compaciton code. Log an error if we fail to close the apply file. * Remove commented-out code. * Use temp dir for compacted c1z. Copy it into the destination dir at the end. * Compaction: Try to use os.Rename() to move file. Fall back to old method if it fails for any reason.
1 parent 2206fc9 commit 502d306

File tree

9 files changed

+385
-520
lines changed

9 files changed

+385
-520
lines changed

pkg/dotc1z/c1file_attached.go

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
8+
reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2"
9+
"github.com/conductorone/baton-sdk/pkg/connectorstore"
10+
"github.com/doug-martin/goqu/v9"
711
)
812

913
type C1FileAttached struct {
1014
safe bool
1115
file *C1File
1216
}
1317

14-
func (c *C1FileAttached) CompactTable(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string, tableName string) error {
18+
func (c *C1FileAttached) CompactTable(ctx context.Context, baseSyncID string, appliedSyncID string, tableName string) error {
1519
if !c.safe {
1620
return errors.New("database has been detached")
1721
}
@@ -40,20 +44,7 @@ func (c *C1FileAttached) CompactTable(ctx context.Context, destSyncID string, ba
4044
}
4145
}
4246

43-
// Step 1: Insert ALL records from base sync
44-
insertBaseQuery := fmt.Sprintf(`
45-
INSERT INTO main.%s (%s)
46-
SELECT %s
47-
FROM base.%s
48-
WHERE sync_id = ?
49-
`, tableName, columnList, selectList, tableName)
50-
51-
_, err = c.file.db.ExecContext(ctx, insertBaseQuery, destSyncID, baseSyncID)
52-
if err != nil {
53-
return fmt.Errorf("failed to copy base records: %w", err)
54-
}
55-
56-
// Step 2: Insert/replace records from applied sync where applied.discovered_at > main.discovered_at
47+
// Insert/replace records from applied sync where applied.discovered_at > main.discovered_at
5748
insertOrReplaceAppliedQuery := fmt.Sprintf(`
5849
INSERT OR REPLACE INTO main.%s (%s)
5950
SELECT %s
@@ -73,7 +64,7 @@ func (c *C1FileAttached) CompactTable(ctx context.Context, destSyncID string, ba
7364
)
7465
`, tableName, columnList, selectList, tableName, tableName, tableName)
7566

76-
_, err = c.file.db.ExecContext(ctx, insertOrReplaceAppliedQuery, destSyncID, appliedSyncID, destSyncID, destSyncID)
67+
_, err = c.file.db.ExecContext(ctx, insertOrReplaceAppliedQuery, baseSyncID, appliedSyncID, baseSyncID, baseSyncID)
7768
return err
7869
}
7970

@@ -94,7 +85,7 @@ func (c *C1FileAttached) getTableColumns(ctx context.Context, tableName string)
9485
var cid int
9586
var name, dataType string
9687
var notNull, pk int
97-
var defaultValue interface{}
88+
var defaultValue any
9889

9990
err := rows.Scan(&cid, &name, &dataType, &notNull, &defaultValue, &pk)
10091
if err != nil {
@@ -113,30 +104,73 @@ func (c *C1FileAttached) getTableColumns(ctx context.Context, tableName string)
113104
return columns, nil
114105
}
115106

116-
func (c *C1FileAttached) CompactResourceTypes(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string) error {
107+
func (c *C1FileAttached) CompactResourceTypes(ctx context.Context, baseSyncID string, appliedSyncID string) error {
117108
if !c.safe {
118109
return errors.New("database has been detached")
119110
}
120-
return c.CompactTable(ctx, destSyncID, baseSyncID, appliedSyncID, "v1_resource_types")
111+
return c.CompactTable(ctx, baseSyncID, appliedSyncID, "v1_resource_types")
121112
}
122113

123-
func (c *C1FileAttached) CompactResources(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string) error {
114+
func (c *C1FileAttached) CompactResources(ctx context.Context, baseSyncID string, appliedSyncID string) error {
124115
if !c.safe {
125116
return errors.New("database has been detached")
126117
}
127-
return c.CompactTable(ctx, destSyncID, baseSyncID, appliedSyncID, "v1_resources")
118+
return c.CompactTable(ctx, baseSyncID, appliedSyncID, "v1_resources")
128119
}
129120

130-
func (c *C1FileAttached) CompactEntitlements(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string) error {
121+
func (c *C1FileAttached) CompactEntitlements(ctx context.Context, baseSyncID string, appliedSyncID string) error {
131122
if !c.safe {
132123
return errors.New("database has been detached")
133124
}
134-
return c.CompactTable(ctx, destSyncID, baseSyncID, appliedSyncID, "v1_entitlements")
125+
return c.CompactTable(ctx, baseSyncID, appliedSyncID, "v1_entitlements")
126+
}
127+
128+
func (c *C1FileAttached) CompactGrants(ctx context.Context, baseSyncID string, appliedSyncID string) error {
129+
if !c.safe {
130+
return errors.New("database has been detached")
131+
}
132+
return c.CompactTable(ctx, baseSyncID, appliedSyncID, "v1_grants")
133+
}
134+
135+
func unionSyncTypes(a, b connectorstore.SyncType) connectorstore.SyncType {
136+
switch {
137+
case a == connectorstore.SyncTypeFull || b == connectorstore.SyncTypeFull:
138+
return connectorstore.SyncTypeFull
139+
case a == connectorstore.SyncTypeResourcesOnly || b == connectorstore.SyncTypeResourcesOnly:
140+
return connectorstore.SyncTypeResourcesOnly
141+
default:
142+
return connectorstore.SyncTypePartial
143+
}
135144
}
136145

137-
func (c *C1FileAttached) CompactGrants(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string) error {
146+
func (c *C1FileAttached) UpdateSync(ctx context.Context, baseSync *reader_v2.SyncRun, appliedSync *reader_v2.SyncRun) error {
138147
if !c.safe {
139148
return errors.New("database has been detached")
140149
}
141-
return c.CompactTable(ctx, destSyncID, baseSyncID, appliedSyncID, "v1_grants")
150+
syncType := unionSyncTypes(connectorstore.SyncType(baseSync.GetSyncType()), connectorstore.SyncType(appliedSync.GetSyncType()))
151+
152+
latestEndedAt := baseSync.GetEndedAt().AsTime()
153+
if appliedSync.GetEndedAt().AsTime().After(latestEndedAt) {
154+
latestEndedAt = appliedSync.GetEndedAt().AsTime()
155+
}
156+
157+
baseSyncID := baseSync.GetId()
158+
q := c.file.db.Update(fmt.Sprintf("main.%s", syncRuns.Name()))
159+
q = q.Set(goqu.Record{
160+
"ended_at": latestEndedAt.Format("2006-01-02 15:04:05.999999999"),
161+
"sync_type": string(syncType),
162+
})
163+
q = q.Where(goqu.C("sync_id").Eq(baseSyncID))
164+
165+
query, args, err := q.ToSQL()
166+
if err != nil {
167+
return fmt.Errorf("failed to build update sync query: %w", err)
168+
}
169+
170+
_, err = c.file.db.ExecContext(ctx, query, args...)
171+
if err != nil {
172+
return fmt.Errorf("failed to update sync %s to type %s: %w", baseSyncID, syncType, err)
173+
}
174+
175+
return nil
142176
}

pkg/dotc1z/grants.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -63,29 +63,6 @@ func (r *grantsTable) Migrations(ctx context.Context, db *goqu.Database) error {
6363
return nil
6464
}
6565

66-
// DropGrantIndexes drops the indexes on the grants table.
67-
// This should only be called when compacting the grants table.
68-
// These indexes are re-created when we open the database again.
69-
func (c *C1File) DropGrantIndexes(ctx context.Context) error {
70-
ctx, span := tracer.Start(ctx, "C1File.DropGrantsIndexes")
71-
defer span.End()
72-
73-
indexes := []string{
74-
fmt.Sprintf("idx_grants_resource_type_id_resource_id_v%s", grants.Version()),
75-
fmt.Sprintf("idx_grants_principal_id_v%s", grants.Version()),
76-
fmt.Sprintf("idx_grants_entitlement_id_principal_id_v%s", grants.Version()),
77-
fmt.Sprintf("idx_grants_external_sync_v%s", grants.Version()),
78-
}
79-
80-
for _, index := range indexes {
81-
_, err := c.db.ExecContext(ctx, fmt.Sprintf("DROP INDEX IF EXISTS %s", index))
82-
if err != nil {
83-
return err
84-
}
85-
}
86-
return nil
87-
}
88-
8966
func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGrantsRequest) (*v2.GrantsServiceListGrantsResponse, error) {
9067
ctx, span := tracer.Start(ctx, "C1File.ListGrants")
9168
defer span.End()

pkg/synccompactor/attached/attached.go

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66

7+
reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2"
78
"github.com/conductorone/baton-sdk/pkg/connectorstore"
89
"github.com/conductorone/baton-sdk/pkg/dotc1z"
910
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
@@ -13,49 +14,40 @@ import (
1314
type Compactor struct {
1415
base *dotc1z.C1File
1516
applied *dotc1z.C1File
16-
dest *dotc1z.C1File
1717
}
1818

19-
func NewAttachedCompactor(base *dotc1z.C1File, applied *dotc1z.C1File, dest *dotc1z.C1File) *Compactor {
19+
func NewAttachedCompactor(base *dotc1z.C1File, applied *dotc1z.C1File) *Compactor {
2020
return &Compactor{
2121
base: base,
2222
applied: applied,
23-
dest: dest,
2423
}
2524
}
2625

27-
func (c *Compactor) CompactWithSyncID(ctx context.Context, destSyncID string) error {
28-
baseSyncID, err := c.base.LatestFinishedSyncID(ctx, connectorstore.SyncTypeAny)
26+
func (c *Compactor) Compact(ctx context.Context) error {
27+
baseSync, err := c.base.GetLatestFinishedSync(ctx, reader_v2.SyncsReaderServiceGetLatestFinishedSyncRequest_builder{
28+
SyncType: string(connectorstore.SyncTypeAny),
29+
}.Build())
2930
if err != nil {
30-
return fmt.Errorf("failed to get base sync ID: %w", err)
31+
return fmt.Errorf("failed to get base sync: %w", err)
3132
}
32-
if baseSyncID == "" {
33+
if baseSync == nil || baseSync.GetSync() == nil {
3334
return fmt.Errorf("no finished sync found in base")
3435
}
3536

36-
appliedSyncID, err := c.applied.LatestFinishedSyncID(ctx, connectorstore.SyncTypeAny)
37+
appliedSync, err := c.applied.GetLatestFinishedSync(ctx, reader_v2.SyncsReaderServiceGetLatestFinishedSyncRequest_builder{
38+
SyncType: string(connectorstore.SyncTypeAny),
39+
}.Build())
3740
if err != nil {
38-
return fmt.Errorf("failed to get applied sync ID: %w", err)
41+
return fmt.Errorf("failed to get applied sync: %w", err)
3942
}
40-
if appliedSyncID == "" {
43+
if appliedSync == nil || appliedSync.GetSync() == nil {
4144
return fmt.Errorf("no finished sync found in applied")
4245
}
4346

44-
// Attach both the base and applied databases to the destination
45-
base, err := c.dest.AttachFile(c.base, "base")
46-
if err != nil {
47-
return fmt.Errorf("failed to attach databases to destination: %w", err)
48-
}
4947
l := ctxzap.Extract(ctx)
50-
defer func() {
51-
_, err := base.DetachFile("base")
52-
if err != nil {
53-
l.Error("failed to detach file", zap.Error(err))
54-
}
55-
}()
5648

5749
// Attach both the base and applied databases to the destination
58-
attached, err := c.dest.AttachFile(c.applied, "attached")
50+
attached, err := c.base.AttachFile(c.applied, "attached")
5951
if err != nil {
6052
return fmt.Errorf("failed to attach databases to destination: %w", err)
6153
}
@@ -66,40 +58,36 @@ func (c *Compactor) CompactWithSyncID(ctx context.Context, destSyncID string) er
6658
}
6759
}()
6860

69-
// Drop grants indexes to improve performance.
70-
err = c.dest.DropGrantIndexes(ctx)
71-
if err != nil {
72-
return fmt.Errorf("failed to drop grants indexes: %w", err)
73-
}
74-
75-
if err := c.processRecords(ctx, attached, destSyncID, baseSyncID, appliedSyncID); err != nil {
61+
if err := c.processRecords(ctx, attached, baseSync.GetSync(), appliedSync.GetSync()); err != nil {
7662
return fmt.Errorf("failed to process records: %w", err)
7763
}
7864

79-
// Re-create the destination database to re-create the grant indexes.
80-
err = c.dest.InitTables(ctx)
81-
if err != nil {
82-
return fmt.Errorf("failed to re-create destination database: %w", err)
83-
}
84-
8565
return nil
8666
}
8767

88-
func (c *Compactor) processRecords(ctx context.Context, attached *dotc1z.C1FileAttached, destSyncID string, baseSyncID string, appliedSyncID string) error {
68+
func (c *Compactor) processRecords(ctx context.Context, attached *dotc1z.C1FileAttached, baseSync *reader_v2.SyncRun, appliedSync *reader_v2.SyncRun) error {
69+
baseSyncID := baseSync.GetId()
70+
appliedSyncID := appliedSync.GetId()
71+
72+
// Update the base sync type to the union of the base and applied sync types.
73+
if err := attached.UpdateSync(ctx, baseSync, appliedSync); err != nil {
74+
return fmt.Errorf("failed to update sync %s: %w", baseSyncID, err)
75+
}
76+
8977
// Compact all tables: copy base records and merge newer applied records using raw SQL
90-
if err := attached.CompactResourceTypes(ctx, destSyncID, baseSyncID, appliedSyncID); err != nil {
78+
if err := attached.CompactResourceTypes(ctx, baseSyncID, appliedSyncID); err != nil {
9179
return fmt.Errorf("failed to compact resource types: %w", err)
9280
}
9381

94-
if err := attached.CompactResources(ctx, destSyncID, baseSyncID, appliedSyncID); err != nil {
82+
if err := attached.CompactResources(ctx, baseSyncID, appliedSyncID); err != nil {
9583
return fmt.Errorf("failed to compact resources: %w", err)
9684
}
9785

98-
if err := attached.CompactEntitlements(ctx, destSyncID, baseSyncID, appliedSyncID); err != nil {
86+
if err := attached.CompactEntitlements(ctx, baseSyncID, appliedSyncID); err != nil {
9987
return fmt.Errorf("failed to compact entitlements: %w", err)
10088
}
10189

102-
if err := attached.CompactGrants(ctx, destSyncID, baseSyncID, appliedSyncID); err != nil {
90+
if err := attached.CompactGrants(ctx, baseSyncID, appliedSyncID); err != nil {
10391
return fmt.Errorf("failed to compact grants: %w", err)
10492
}
10593

pkg/synccompactor/attached/attached_test.go

Lines changed: 6 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ func TestAttachedCompactor(t *testing.T) {
1717
tmpDir := t.TempDir()
1818
baseFile := filepath.Join(tmpDir, "base.c1z")
1919
appliedFile := filepath.Join(tmpDir, "applied.c1z")
20-
destFile := filepath.Join(tmpDir, "dest.c1z")
2120

2221
opts := []dotc1z.C1ZOption{
2322
dotc1z.WithPragma("journal_mode", "WAL"),
@@ -48,20 +47,8 @@ func TestAttachedCompactor(t *testing.T) {
4847
err = appliedDB.EndSync(ctx)
4948
require.NoError(t, err)
5049

51-
// Create destination database
52-
destDB, err := dotc1z.NewC1ZFile(ctx, destFile, opts...)
53-
require.NoError(t, err)
54-
defer destDB.Close()
55-
56-
// Start a sync in destination and run compaction
57-
destSyncID, err := destDB.StartNewSync(ctx, connectorstore.SyncTypeFull, "")
58-
require.NoError(t, err)
59-
60-
compactor := NewAttachedCompactor(baseDB, appliedDB, destDB)
61-
err = compactor.CompactWithSyncID(ctx, destSyncID)
62-
require.NoError(t, err)
63-
64-
err = destDB.EndSync(ctx)
50+
compactor := NewAttachedCompactor(baseDB, appliedDB)
51+
err = compactor.Compact(ctx)
6552
require.NoError(t, err)
6653

6754
// Verify that compaction completed without errors
@@ -76,7 +63,6 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) {
7663
tmpDir := t.TempDir()
7764
baseFile := filepath.Join(tmpDir, "base.c1z")
7865
appliedFile := filepath.Join(tmpDir, "applied.c1z")
79-
destFile := filepath.Join(tmpDir, "dest.c1z")
8066

8167
opts := []dotc1z.C1ZOption{
8268
dotc1z.WithPragma("journal_mode", "WAL"),
@@ -109,20 +95,8 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) {
10995
err = appliedDB.EndSync(ctx)
11096
require.NoError(t, err)
11197

112-
// Create destination database
113-
destDB, err := dotc1z.NewC1ZFile(ctx, destFile, opts...)
114-
require.NoError(t, err)
115-
defer destDB.Close()
116-
117-
// Start a sync in destination and run compaction
118-
destSyncID, err := destDB.StartNewSync(ctx, connectorstore.SyncTypeFull, "")
119-
require.NoError(t, err)
120-
121-
compactor := NewAttachedCompactor(baseDB, appliedDB, destDB)
122-
err = compactor.CompactWithSyncID(ctx, destSyncID)
123-
require.NoError(t, err)
124-
125-
err = destDB.EndSync(ctx)
98+
compactor := NewAttachedCompactor(baseDB, appliedDB)
99+
err = compactor.Compact(ctx)
126100
require.NoError(t, err)
127101

128102
// Verify that compaction completed without errors
@@ -139,7 +113,6 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) {
139113
tmpDir := t.TempDir()
140114
baseFile := filepath.Join(tmpDir, "base.c1z")
141115
appliedFile := filepath.Join(tmpDir, "applied.c1z")
142-
destFile := filepath.Join(tmpDir, "dest.c1z")
143116

144117
opts := []dotc1z.C1ZOption{
145118
dotc1z.WithPragma("journal_mode", "WAL"),
@@ -179,20 +152,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) {
179152
err = appliedDB.EndSync(ctx)
180153
require.NoError(t, err)
181154

182-
// Create destination database
183-
destDB, err := dotc1z.NewC1ZFile(ctx, destFile, opts...)
184-
require.NoError(t, err)
185-
defer destDB.Close()
186-
187-
// Start a sync in destination and run compaction
188-
destSyncID, err := destDB.StartNewSync(ctx, connectorstore.SyncTypeFull, "")
189-
require.NoError(t, err)
190-
191-
compactor := NewAttachedCompactor(baseDB, appliedDB, destDB)
192-
err = compactor.CompactWithSyncID(ctx, destSyncID)
193-
require.NoError(t, err)
194-
195-
err = destDB.EndSync(ctx)
155+
compactor := NewAttachedCompactor(baseDB, appliedDB)
156+
err = compactor.Compact(ctx)
196157
require.NoError(t, err)
197158

198159
// Verify that compaction completed without errors

0 commit comments

Comments
 (0)