Skip to content

Commit 5a5a941

Browse files
authored
Fix potential issue with cached sync run persisting after a sync is started or ended. (#612)
* Fix potential issue with cached sync run persisting after a sync is started or ended. * Remove unneeded bool. Just check if the cached data exists or not.
1 parent aaeb86e commit 5a5a941

File tree

3 files changed

+92
-15
lines changed

3 files changed

+92
-15
lines changed

pkg/dotc1z/c1file.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ type C1File struct {
4747
encoderConcurrency int
4848

4949
// Cached sync run for listConnectorObjects (avoids N+1 queries)
50-
cachedViewSyncRun *syncRun
51-
cachedViewSyncOnce sync.Once
52-
cachedViewSyncErr error
50+
cachedViewSyncRun *syncRun
51+
cachedViewSyncMu sync.Mutex
52+
cachedViewSyncErr error
5353

5454
// Slow query tracking
5555
slowQueryLogTimes map[string]time.Time

pkg/dotc1z/c1file_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,3 +544,65 @@ func TestC1ZReadOnlyMode(t *testing.T) {
544544
require.NoError(t, err)
545545
require.Equal(t, fileInfo1.ModTime(), fileInfo2.ModTime())
546546
}
547+
548+
func TestC1ZCachedViewSyncRunInvalidation(t *testing.T) {
549+
ctx := t.Context()
550+
testFilePath := filepath.Join(c1zTests.workingDir, "test-cached-view-sync-invalidation.c1z")
551+
552+
f, err := NewC1ZFile(ctx, testFilePath, WithPragma("journal_mode", "WAL"))
553+
require.NoError(t, err)
554+
555+
// Start first sync and add a resource
556+
syncID1, err := f.StartNewSync(ctx, connectorstore.SyncTypeFull, "")
557+
require.NoError(t, err)
558+
require.NotEmpty(t, syncID1)
559+
560+
err = f.PutResourceTypes(ctx, v2.ResourceType_builder{Id: testResourceType}.Build())
561+
require.NoError(t, err)
562+
563+
err = f.PutResources(ctx, v2.Resource_builder{
564+
Id: v2.ResourceId_builder{
565+
ResourceType: testResourceType,
566+
Resource: "resource-1",
567+
}.Build(),
568+
}.Build())
569+
require.NoError(t, err)
570+
571+
err = f.EndSync(ctx)
572+
require.NoError(t, err)
573+
574+
// Call ListResources to populate the cache with sync1
575+
resp1, err := f.ListResources(ctx, v2.ResourcesServiceListResourcesRequest_builder{}.Build())
576+
require.NoError(t, err)
577+
require.Len(t, resp1.GetList(), 1)
578+
require.Equal(t, "resource-1", resp1.GetList()[0].GetId().GetResource())
579+
580+
// Start a new sync and add a different resource
581+
syncID2, err := f.StartNewSync(ctx, connectorstore.SyncTypeFull, "")
582+
require.NoError(t, err)
583+
require.NotEmpty(t, syncID2)
584+
require.NotEqual(t, syncID1, syncID2)
585+
586+
err = f.PutResources(ctx, v2.Resource_builder{
587+
Id: v2.ResourceId_builder{
588+
ResourceType: testResourceType,
589+
Resource: "resource-2",
590+
}.Build(),
591+
}.Build())
592+
require.NoError(t, err)
593+
594+
// End the new sync
595+
err = f.EndSync(ctx)
596+
require.NoError(t, err)
597+
598+
// Call ListResources again - it should return resource-2 from the new finished sync (sync2),
599+
// but it will return resource-1 from the cached sync (sync1) instead because the cache wasn't invalidated
600+
resp2, err := f.ListResources(ctx, v2.ResourcesServiceListResourcesRequest_builder{}.Build())
601+
require.NoError(t, err)
602+
// This assertion will fail because the cache wasn't invalidated when sync2 finished
603+
require.Len(t, resp2.GetList(), 1, "should return resource from new sync")
604+
require.Equal(t, "resource-2", resp2.GetList()[0].GetId().GetResource(), "should return resource-2 from the new finished sync (sync2), not resource-1 from cached sync (sync1)")
605+
606+
err = f.Close()
607+
require.NoError(t, err)
608+
}

pkg/dotc1z/sync_runs.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,27 +97,40 @@ type syncRun struct {
9797

9898
// getCachedViewSyncRun returns the cached sync run for read operations.
9999
// This avoids N+1 queries when paginating through listConnectorObjects.
100-
// The result is computed once and cached for the lifetime of the C1File.
100+
// The cache is invalidated when a sync starts or ends.
101101
func (c *C1File) getCachedViewSyncRun(ctx context.Context) (*syncRun, error) {
102102
ctx, span := tracer.Start(ctx, "C1File.getCachedViewSyncRun")
103103
defer span.End()
104104

105-
c.cachedViewSyncOnce.Do(func() {
106-
// First try to get a finished full sync
107-
c.cachedViewSyncRun, c.cachedViewSyncErr = c.getFinishedSync(ctx, 0, connectorstore.SyncTypeFull)
108-
if c.cachedViewSyncErr != nil {
109-
return
110-
}
105+
c.cachedViewSyncMu.Lock()
106+
defer c.cachedViewSyncMu.Unlock()
111107

112-
// If no finished sync, try to get an unfinished one
113-
if c.cachedViewSyncRun == nil {
114-
c.cachedViewSyncRun, c.cachedViewSyncErr = c.getLatestUnfinishedSync(ctx, connectorstore.SyncTypeAny)
115-
}
116-
})
108+
if c.cachedViewSyncRun != nil || c.cachedViewSyncErr != nil {
109+
return c.cachedViewSyncRun, c.cachedViewSyncErr
110+
}
111+
112+
// First try to get a finished full sync
113+
c.cachedViewSyncRun, c.cachedViewSyncErr = c.getFinishedSync(ctx, 0, connectorstore.SyncTypeFull)
114+
if c.cachedViewSyncErr != nil {
115+
return c.cachedViewSyncRun, c.cachedViewSyncErr
116+
}
117+
118+
// If no finished sync, try to get an unfinished one
119+
if c.cachedViewSyncRun == nil {
120+
c.cachedViewSyncRun, c.cachedViewSyncErr = c.getLatestUnfinishedSync(ctx, connectorstore.SyncTypeAny)
121+
}
117122

118123
return c.cachedViewSyncRun, c.cachedViewSyncErr
119124
}
120125

126+
// invalidateCachedViewSyncRun clears the cached sync run so it will be recomputed on next access.
127+
func (c *C1File) invalidateCachedViewSyncRun() {
128+
c.cachedViewSyncMu.Lock()
129+
defer c.cachedViewSyncMu.Unlock()
130+
c.cachedViewSyncRun = nil
131+
c.cachedViewSyncErr = nil
132+
}
133+
121134
func (c *C1File) getLatestUnfinishedSync(ctx context.Context, syncType connectorstore.SyncType) (*syncRun, error) {
122135
ctx, span := tracer.Start(ctx, "C1File.getLatestUnfinishedSync")
123136
defer span.End()
@@ -539,6 +552,7 @@ func (c *C1File) StartNewSync(ctx context.Context, syncType connectorstore.SyncT
539552
}
540553

541554
c.currentSyncID = syncID
555+
c.invalidateCachedViewSyncRun()
542556

543557
return c.currentSyncID, nil
544558
}
@@ -597,6 +611,7 @@ func (c *C1File) EndSync(ctx context.Context) error {
597611
}
598612

599613
c.currentSyncID = ""
614+
c.invalidateCachedViewSyncRun()
600615

601616
return nil
602617
}

0 commit comments

Comments
 (0)