Skip to content

Commit c6d2b20

Browse files
fix: add thread safety for feature_flags data structures
Protects feature_flags and feature_flags_by_key with _flags_lock: - Added threading.Lock() for all flag data structure access - Protected reads in get_feature_flag and _compute_payload_locally - Protected writes in _update_flag_state and _process_flag_update - Protected reads in _load_feature_flags and _fetch_feature_flags_from_api - Copies flag dict before holding lock during computation Prevents race conditions between: - Background SSE thread modifying flags - Main thread reading flags for evaluation - Poller thread updating flags from API 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 6030a57 commit c6d2b20

File tree

1 file changed

+115
-100
lines changed

1 file changed

+115
-100
lines changed

posthog/client.py

Lines changed: 115 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,9 @@ def __init__(
257257
self.sse_response = None
258258
self.sse_connected = False
259259
self._sse_lock = threading.Lock()
260+
self._flags_lock = (
261+
threading.Lock()
262+
) # Protects feature_flags and feature_flags_by_key
260263

261264
self.capture_exception_code_variables = capture_exception_code_variables
262265
self.code_variables_mask_patterns = (
@@ -1222,19 +1225,20 @@ def _update_flag_state(
12221225
self, data: FlagDefinitionCacheData, old_flags_by_key: Optional[dict] = None
12231226
) -> None:
12241227
"""Update internal flag state from cache data and invalidate evaluation cache if changed."""
1225-
self.feature_flags = data["flags"]
1226-
self.group_type_mapping = data["group_type_mapping"]
1227-
self.cohorts = data["cohorts"]
1228-
1229-
# Invalidate evaluation cache if flag definitions changed
1230-
if (
1231-
self.flag_cache
1232-
and old_flags_by_key is not None
1233-
and old_flags_by_key != (self.feature_flags_by_key or {})
1234-
):
1235-
old_version = self.flag_definition_version
1236-
self.flag_definition_version += 1
1237-
self.flag_cache.invalidate_version(old_version)
1228+
with self._flags_lock:
1229+
self.feature_flags = data["flags"]
1230+
self.group_type_mapping = data["group_type_mapping"]
1231+
self.cohorts = data["cohorts"]
1232+
1233+
# Invalidate evaluation cache if flag definitions changed
1234+
if (
1235+
self.flag_cache
1236+
and old_flags_by_key is not None
1237+
and old_flags_by_key != (self.feature_flags_by_key or {})
1238+
):
1239+
old_version = self.flag_definition_version
1240+
self.flag_definition_version += 1
1241+
self.flag_cache.invalidate_version(old_version)
12381242

12391243
def _load_feature_flags(self):
12401244
should_fetch = True
@@ -1260,8 +1264,10 @@ def _load_feature_flags(self):
12601264
self.log.debug(
12611265
"[FEATURE FLAGS] Using cached flag definitions from external cache"
12621266
)
1267+
with self._flags_lock:
1268+
old_flags_copy = self.feature_flags_by_key or {}
12631269
self._update_flag_state(
1264-
cached_data, old_flags_by_key=self.feature_flags_by_key or {}
1270+
cached_data, old_flags_by_key=old_flags_copy
12651271
)
12661272
self._last_feature_flag_poll = datetime.now(tz=tzutc())
12671273
return
@@ -1285,7 +1291,8 @@ def _fetch_feature_flags_from_api(self):
12851291
"""Fetch feature flags from the PostHog API."""
12861292
try:
12871293
# Store old flags to detect changes
1288-
old_flags_by_key: dict[str, dict] = self.feature_flags_by_key or {}
1294+
with self._flags_lock:
1295+
old_flags_by_key: dict[str, dict] = self.feature_flags_by_key or {}
12891296

12901297
response = get(
12911298
self.personal_api_key,
@@ -1744,30 +1751,33 @@ def _locally_evaluate_flag(
17441751
self.load_feature_flags()
17451752
response = None
17461753

1747-
if self.feature_flags:
1748-
assert self.feature_flags_by_key is not None, (
1749-
"feature_flags_by_key should be initialized when feature_flags is set"
1750-
)
1751-
# Local evaluation
1752-
flag = self.feature_flags_by_key.get(key)
1753-
if flag:
1754-
try:
1755-
response = self._compute_flag_locally(
1756-
flag,
1757-
distinct_id,
1758-
groups=groups,
1759-
person_properties=person_properties,
1760-
group_properties=group_properties,
1761-
)
1762-
self.log.debug(
1763-
f"Successfully computed flag locally: {key} -> {response}"
1764-
)
1765-
except (RequiresServerEvaluation, InconclusiveMatchError) as e:
1766-
self.log.debug(f"Failed to compute flag {key} locally: {e}")
1767-
except Exception as e:
1768-
self.log.exception(
1769-
f"[FEATURE FLAGS] Error while computing variant locally: {e}"
1770-
)
1754+
flag = None
1755+
with self._flags_lock:
1756+
if self.feature_flags:
1757+
assert self.feature_flags_by_key is not None, (
1758+
"feature_flags_by_key should be initialized when feature_flags is set"
1759+
)
1760+
# Local evaluation - copy flag to avoid holding lock during computation
1761+
flag = self.feature_flags_by_key.get(key)
1762+
1763+
if flag:
1764+
try:
1765+
response = self._compute_flag_locally(
1766+
flag,
1767+
distinct_id,
1768+
groups=groups,
1769+
person_properties=person_properties,
1770+
group_properties=group_properties,
1771+
)
1772+
self.log.debug(
1773+
f"Successfully computed flag locally: {key} -> {response}"
1774+
)
1775+
except (RequiresServerEvaluation, InconclusiveMatchError) as e:
1776+
self.log.debug(f"Failed to compute flag {key} locally: {e}")
1777+
except Exception as e:
1778+
self.log.exception(
1779+
f"[FEATURE FLAGS] Error while computing variant locally: {e}"
1780+
)
17711781
return response
17721782

17731783
def get_feature_flag_payload(
@@ -1935,21 +1945,22 @@ def _compute_payload_locally(
19351945
) -> Optional[str]:
19361946
payload = None
19371947

1938-
if self.feature_flags_by_key is None:
1939-
return payload
1940-
1941-
flag_definition = self.feature_flags_by_key.get(key)
1942-
if flag_definition:
1943-
flag_filters = flag_definition.get("filters") or {}
1944-
flag_payloads = flag_filters.get("payloads") or {}
1945-
# For boolean flags, convert True to "true"
1946-
# For multivariate flags, use the variant string as-is
1947-
lookup_value = (
1948-
"true"
1949-
if isinstance(match_value, bool) and match_value
1950-
else str(match_value)
1951-
)
1952-
payload = flag_payloads.get(lookup_value, None)
1948+
with self._flags_lock:
1949+
if self.feature_flags_by_key is None:
1950+
return payload
1951+
1952+
flag_definition = self.feature_flags_by_key.get(key)
1953+
if flag_definition:
1954+
flag_filters = flag_definition.get("filters") or {}
1955+
flag_payloads = flag_filters.get("payloads") or {}
1956+
# For boolean flags, convert True to "true"
1957+
# For multivariate flags, use the variant string as-is
1958+
lookup_value = (
1959+
"true"
1960+
if isinstance(match_value, bool) and match_value
1961+
else str(match_value)
1962+
)
1963+
payload = flag_payloads.get(lookup_value, None)
19531964
return payload
19541965

19551966
def get_all_flags(
@@ -2372,53 +2383,57 @@ def _process_flag_update(self, flag_data):
23722383

23732384
is_deleted = flag_data.get("deleted", False)
23742385

2375-
# Handle flag deletion
2376-
if is_deleted:
2377-
self.log.debug(f"[FEATURE FLAGS] Deleting flag: {flag_key}")
2378-
if self.feature_flags_by_key and flag_key in self.feature_flags_by_key:
2379-
del self.feature_flags_by_key[flag_key]
2380-
2381-
# Also remove from the array
2382-
if self.feature_flags:
2383-
self.feature_flags = [
2384-
f for f in self.feature_flags if f.get("key") != flag_key
2385-
]
2386+
with self._flags_lock:
2387+
# Handle flag deletion
2388+
if is_deleted:
2389+
self.log.debug(f"[FEATURE FLAGS] Deleting flag: {flag_key}")
2390+
if (
2391+
self.feature_flags_by_key
2392+
and flag_key in self.feature_flags_by_key
2393+
):
2394+
del self.feature_flags_by_key[flag_key]
2395+
2396+
# Also remove from the array
2397+
if self.feature_flags:
2398+
self.feature_flags = [
2399+
f for f in self.feature_flags if f.get("key") != flag_key
2400+
]
2401+
2402+
# Invalidate cache for this flag
2403+
if self.flag_cache:
2404+
old_version = self.flag_definition_version
2405+
self.flag_definition_version += 1
2406+
self.flag_cache.invalidate_version(old_version)
23862407

2387-
# Invalidate cache for this flag
2388-
if self.flag_cache:
2389-
old_version = self.flag_definition_version
2390-
self.flag_definition_version += 1
2391-
self.flag_cache.invalidate_version(old_version)
2392-
2393-
else:
2394-
# Update or add flag
2395-
self.log.debug(f"[FEATURE FLAGS] Updating flag: {flag_key}")
2396-
2397-
if self.feature_flags_by_key is None:
2398-
self.feature_flags_by_key = {}
2399-
2400-
if self.feature_flags is None:
2401-
self.feature_flags = []
2402-
2403-
# Update the lookup table
2404-
self.feature_flags_by_key[flag_key] = flag_data
2405-
2406-
# Update or add to the array
2407-
flag_exists = False
2408-
for i, f in enumerate(self.feature_flags):
2409-
if f.get("key") == flag_key:
2410-
self.feature_flags[i] = flag_data
2411-
flag_exists = True
2412-
break
2413-
2414-
if not flag_exists:
2415-
self.feature_flags.append(flag_data)
2416-
2417-
# Invalidate cache when flag definitions change
2418-
if self.flag_cache:
2419-
old_version = self.flag_definition_version
2420-
self.flag_definition_version += 1
2421-
self.flag_cache.invalidate_version(old_version)
2408+
else:
2409+
# Update or add flag
2410+
self.log.debug(f"[FEATURE FLAGS] Updating flag: {flag_key}")
2411+
2412+
if self.feature_flags_by_key is None:
2413+
self.feature_flags_by_key = {}
2414+
2415+
if self.feature_flags is None:
2416+
self.feature_flags = []
2417+
2418+
# Update the lookup table
2419+
self.feature_flags_by_key[flag_key] = flag_data
2420+
2421+
# Update or add to the array
2422+
flag_exists = False
2423+
for i, f in enumerate(self.feature_flags):
2424+
if f.get("key") == flag_key:
2425+
self.feature_flags[i] = flag_data
2426+
flag_exists = True
2427+
break
2428+
2429+
if not flag_exists:
2430+
self.feature_flags.append(flag_data)
2431+
2432+
# Invalidate cache when flag definitions change
2433+
if self.flag_cache:
2434+
old_version = self.flag_definition_version
2435+
self.flag_definition_version += 1
2436+
self.flag_cache.invalidate_version(old_version)
24222437

24232438
# Call the user's callback if provided
24242439
if self.on_feature_flags_update:

0 commit comments

Comments
 (0)