Skip to content

Commit cfb8ab8

Browse files
committed
Fix thread-safety issues in feature flag cache
Addressed PR review comments from #304: 1. Fixed race condition when reading flagCtx fields - Added proper locking with flagCtx.mu for enabled, lastFetched, fetching - Previously accessed without correct lock causing data races 2. Fixed concurrent fetch issue - Implemented fetching flag to prevent simultaneous HTTP requests - First goroutine sets fetching=true, others use cached value - Prevents rate limiting from concurrent fetches when cache expires 3. Added HTTP request timeout - Added featureFlagHTTPTimeout = 10s constant - Wraps context with timeout if none exists - Prevents indefinite hangs (Go's default has no timeout) All tests pass. Thread-safe concurrent access verified.
1 parent 126c10f commit cfb8ab8

File tree

1 file changed

+64
-15
lines changed

1 file changed

+64
-15
lines changed

telemetry/featureflag.go

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,20 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"io"
78
"net/http"
9+
"strings"
810
"sync"
911
"time"
1012
)
1113

14+
const (
15+
// featureFlagCacheDuration is how long to cache feature flag values
16+
featureFlagCacheDuration = 15 * time.Minute
17+
// featureFlagHTTPTimeout is the default timeout for feature flag HTTP requests
18+
featureFlagHTTPTimeout = 10 * time.Second
19+
)
20+
1221
// featureFlagCache manages feature flag state per host with reference counting.
1322
// This prevents rate limiting by caching feature flag responses.
1423
type featureFlagCache struct {
@@ -18,10 +27,12 @@ type featureFlagCache struct {
1827

1928
// featureFlagContext holds feature flag state and reference count for a host.
2029
type featureFlagContext struct {
30+
mu sync.RWMutex // protects enabled, lastFetched, fetching
2131
enabled *bool
2232
lastFetched time.Time
23-
refCount int
33+
refCount int // protected by featureFlagCache.mu
2434
cacheDuration time.Duration
35+
fetching bool // true if a fetch is in progress
2536
}
2637

2738
var (
@@ -48,7 +59,7 @@ func (c *featureFlagCache) getOrCreateContext(host string) *featureFlagContext {
4859
ctx, exists := c.contexts[host]
4960
if !exists {
5061
ctx = &featureFlagContext{
51-
cacheDuration: 15 * time.Minute,
62+
cacheDuration: featureFlagCacheDuration,
5263
}
5364
c.contexts[host] = ctx
5465
}
@@ -81,28 +92,57 @@ func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string,
8192
return false, nil
8293
}
8394

84-
// Check if cache is valid
95+
// Check if cache is valid (with proper locking)
96+
flagCtx.mu.RLock()
8597
if flagCtx.enabled != nil && time.Since(flagCtx.lastFetched) < flagCtx.cacheDuration {
86-
return *flagCtx.enabled, nil
98+
enabled := *flagCtx.enabled
99+
flagCtx.mu.RUnlock()
100+
return enabled, nil
101+
}
102+
103+
// Check if another goroutine is already fetching
104+
if flagCtx.fetching {
105+
// Return cached value if available, otherwise wait
106+
if flagCtx.enabled != nil {
107+
enabled := *flagCtx.enabled
108+
flagCtx.mu.RUnlock()
109+
return enabled, nil
110+
}
111+
flagCtx.mu.RUnlock()
112+
// No cached value and fetch in progress, return false
113+
return false, nil
87114
}
88115

116+
// Mark as fetching
117+
flagCtx.fetching = true
118+
flagCtx.mu.RUnlock()
119+
89120
// Fetch fresh value
90121
enabled, err := fetchFeatureFlag(ctx, host, httpClient)
122+
123+
// Update cache (with proper locking)
124+
flagCtx.mu.Lock()
125+
flagCtx.fetching = false
126+
if err == nil {
127+
flagCtx.enabled = &enabled
128+
flagCtx.lastFetched = time.Now()
129+
}
130+
// On error, keep the old cached value if it exists
131+
result := false
132+
var returnErr error
91133
if err != nil {
92-
// Return cached value on error, or false if no cache
93134
if flagCtx.enabled != nil {
94-
return *flagCtx.enabled, nil
135+
result = *flagCtx.enabled
136+
returnErr = nil // Return cached value without error
137+
} else {
138+
returnErr = err
95139
}
96-
return false, err
140+
} else {
141+
result = enabled
97142
}
143+
flagCtx.mu.Unlock()
98144

99-
// Update cache
100-
c.mu.Lock()
101-
flagCtx.enabled = &enabled
102-
flagCtx.lastFetched = time.Now()
103-
c.mu.Unlock()
104-
105-
return enabled, nil
145+
return result, returnErr
106146
}
107147

108148
// isExpired returns true if the cache has expired.
@@ -112,9 +152,16 @@ func (c *featureFlagContext) isExpired() bool {
112152

113153
// fetchFeatureFlag fetches the feature flag value from Databricks.
114154
func fetchFeatureFlag(ctx context.Context, host string, httpClient *http.Client) (bool, error) {
155+
// Add timeout to context if it doesn't have a deadline
156+
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
157+
var cancel context.CancelFunc
158+
ctx, cancel = context.WithTimeout(ctx, featureFlagHTTPTimeout)
159+
defer cancel()
160+
}
161+
115162
// Construct endpoint URL, adding https:// if not already present
116163
var endpoint string
117-
if len(host) > 7 && (host[:7] == "http://" || host[:8] == "https://") {
164+
if strings.HasPrefix(host, "http://") || strings.HasPrefix(host, "https://") {
118165
endpoint = fmt.Sprintf("%s/api/2.0/feature-flags", host)
119166
} else {
120167
endpoint = fmt.Sprintf("https://%s/api/2.0/feature-flags", host)
@@ -137,6 +184,8 @@ func fetchFeatureFlag(ctx context.Context, host string, httpClient *http.Client)
137184
defer resp.Body.Close()
138185

139186
if resp.StatusCode != http.StatusOK {
187+
// Read and discard body to allow HTTP connection reuse
188+
io.Copy(io.Discard, resp.Body)
140189
return false, fmt.Errorf("feature flag check failed: %d", resp.StatusCode)
141190
}
142191

0 commit comments

Comments
 (0)