Skip to content

Commit a5ad5f6

Browse files
authored
Merge pull request #2964 from Acepresso/file-exists-EC-1406
Fix concurrent symlink creation race condition
2 parents 3eb0e92 + b979e4a commit a5ad5f6

File tree

2 files changed

+103
-3
lines changed

2 files changed

+103
-3
lines changed

internal/policy/source/source.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,13 @@ type PolicyUrl struct {
7878
// downloadCache is a concurrent map used to cache downloaded files.
7979
var downloadCache sync.Map
8080

81+
// symlinkMutexes provides per-destination synchronization for symlink creation
82+
var symlinkMutexes sync.Map
83+
8184
// ClearDownloadCache clears the download cache. This is primarily used for testing.
8285
func ClearDownloadCache() {
8386
downloadCache = sync.Map{}
87+
symlinkMutexes = sync.Map{}
8488
}
8589

8690
type cacheContent struct {
@@ -111,13 +115,24 @@ func getPolicyThroughCache(ctx context.Context, s PolicySource, workDir string,
111115
}
112116

113117
fs := utils.FS(ctx)
114-
if _, err := fs.Stat(dest); err == nil {
115-
return dest, c.metadata, nil
116-
}
117118

118119
// If the destination directory is different from the source directory, we
119120
// need to symlink the source directory to the destination directory.
121+
// Use synchronization to prevent race conditions when multiple goroutines
122+
// try to create the same symlink simultaneously.
120123
if filepath.Dir(dest) != filepath.Dir(d) {
124+
// Get or create a mutex for this specific destination
125+
mutexValue, _ := symlinkMutexes.LoadOrStore(dest, &sync.Mutex{})
126+
mutex := mutexValue.(*sync.Mutex)
127+
128+
mutex.Lock()
129+
defer mutex.Unlock()
130+
131+
// Check again if the destination exists after acquiring the lock
132+
if _, err := fs.Stat(dest); err == nil {
133+
return dest, c.metadata, nil
134+
}
135+
121136
base := filepath.Dir(dest)
122137
if err := fs.MkdirAll(base, 0755); err != nil {
123138
return "", nil, err
@@ -136,6 +151,12 @@ func getPolicyThroughCache(ctx context.Context, s PolicySource, workDir string,
136151
logMetadata(m)
137152
return dest, m, err
138153
}
154+
} else {
155+
// If dest and d are in the same directory, no symlink needed,
156+
// but still check if dest exists
157+
if _, err := fs.Stat(dest); err == nil {
158+
return dest, c.metadata, nil
159+
}
139160
}
140161

141162
if c.metadata != nil {

internal/policy/source/source_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package source
2121
import (
2222
"context"
2323
"errors"
24+
"fmt"
2425
"os"
2526
"path"
2627
"path/filepath"
@@ -633,6 +634,84 @@ func TestDownloadCacheWorkdirMismatch(t *testing.T) {
633634
assert.Equal(t, destination1, destination2)
634635
}
635636

637+
// TestConcurrentPolicyCachingRaceCondition reproduces the "file exists" error
638+
// that occurs when multiple workers simultaneously try to create symlinks from
639+
// cached policy downloads to their individual work directories
640+
func TestConcurrentPolicyCachingRaceCondition(t *testing.T) {
641+
t.Cleanup(func() {
642+
downloadCache = sync.Map{}
643+
})
644+
645+
tmp := t.TempDir()
646+
647+
source := &mockPolicySource{&mock.Mock{}}
648+
source.On("PolicyUrl").Return("policy-url")
649+
source.On("Subdir").Return("subdir")
650+
651+
// Simulate a pre-cached policy download in a shared location
652+
// This represents the first worker that successfully downloaded the policy
653+
sharedCacheDir := filepath.Join(tmp, "shared-cache")
654+
cachedPolicyPath := uniqueDestination(sharedCacheDir, "subdir", source.PolicyUrl())
655+
require.NoError(t, os.MkdirAll(cachedPolicyPath, 0755))
656+
657+
// Create test policy files
658+
policyFile := filepath.Join(cachedPolicyPath, "policy.rego")
659+
require.NoError(t, os.WriteFile(policyFile, []byte("package test"), 0600))
660+
661+
// Pre-populate the cache with the shared policy location
662+
downloadCache.Store("policy-url", func() (string, cacheContent) {
663+
return cachedPolicyPath, cacheContent{}
664+
})
665+
666+
// ALL workers use the same work directory - this forces them to compete
667+
// for the exact same symlink destination path, triggering the race condition
668+
sharedWorkDir := filepath.Join(tmp, "shared-worker-dir")
669+
670+
// Setup concurrent workers that will try to create the SAME symlink
671+
numWorkers := 50
672+
results := make(chan error, numWorkers)
673+
674+
// Synchronization barrier to maximize race condition probability
675+
startSignal := make(chan struct{})
676+
677+
for i := 0; i < numWorkers; i++ {
678+
go func(workerID int) {
679+
// Wait for all workers to be ready
680+
<-startSignal
681+
682+
// All workers use the SAME work directory - this creates the race condition
683+
_, _, err := getPolicyThroughCache(
684+
context.Background(),
685+
source,
686+
sharedWorkDir, // Same workDir for all workers = same destination path
687+
func(sourceURL, destDir string) (metadata.Metadata, error) {
688+
// Should not be called since policy is already cached
689+
return nil, fmt.Errorf("unexpected download call for worker %d", workerID)
690+
},
691+
)
692+
693+
results <- err
694+
}(i)
695+
}
696+
697+
// Release all workers simultaneously to trigger race condition
698+
close(startSignal)
699+
700+
// Collect errors
701+
var errors []error
702+
for i := 0; i < numWorkers; i++ {
703+
if err := <-results; err != nil {
704+
errors = append(errors, err)
705+
}
706+
}
707+
708+
// The test should succeed (no errors) when the race condition is fixed
709+
// Any errors indicate a concurrency bug
710+
if len(errors) > 0 {
711+
t.Errorf("Concurrent policy caching failed with %d errors: %v", len(errors), errors)
712+
}
713+
}
714+
636715
func TestLogMetadata(t *testing.T) {
637716
tests := []struct {
638717
name string

0 commit comments

Comments
 (0)