Skip to content

Commit 6b39b62

Browse files
committed
feat: Add FlagDefinitionCacheProvider interface
1 parent d72e89a commit 6b39b62

File tree

4 files changed

+899
-23
lines changed

4 files changed

+899
-23
lines changed

posthog/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
InconclusiveMatchError as InconclusiveMatchError,
2323
RequiresServerEvaluation as RequiresServerEvaluation,
2424
)
25+
from posthog.flag_definition_cache import (
26+
FlagDefinitionCacheData as FlagDefinitionCacheData,
27+
FlagDefinitionCacheProvider as FlagDefinitionCacheProvider,
28+
)
2529
from posthog.request import (
2630
disable_connection_reuse as disable_connection_reuse,
2731
enable_keep_alive as enable_keep_alive,

posthog/client.py

Lines changed: 105 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
RequiresServerEvaluation,
2929
match_feature_flag_properties,
3030
)
31+
from posthog.flag_definition_cache import (
32+
FlagDefinitionCacheData,
33+
FlagDefinitionCacheProvider,
34+
)
3135
from posthog.poller import Poller
3236
from posthog.request import (
3337
DEFAULT_HOST,
@@ -184,6 +188,7 @@ def __init__(
184188
before_send=None,
185189
flag_fallback_cache_url=None,
186190
enable_local_evaluation=True,
191+
flag_definition_cache_provider: Optional[FlagDefinitionCacheProvider] = None,
187192
capture_exception_code_variables=False,
188193
code_variables_mask_patterns=None,
189194
code_variables_ignore_patterns=None,
@@ -233,6 +238,7 @@ def __init__(
233238
self.flag_cache = self._initialize_flag_cache(flag_fallback_cache_url)
234239
self.flag_definition_version = 0
235240
self._flags_etag: Optional[str] = None
241+
self._flag_definition_cache_provider = flag_definition_cache_provider
236242
self.disabled = disabled
237243
self.disable_geoip = disable_geoip
238244
self.historical_migration = historical_migration
@@ -357,9 +363,9 @@ def feature_flags(self, flags):
357363
for flag in self._feature_flags
358364
if flag.get("key") is not None
359365
}
360-
assert self.feature_flags_by_key is not None, (
361-
"feature_flags_by_key should be initialized when feature_flags is set"
362-
)
366+
assert (
367+
self.feature_flags_by_key is not None
368+
), "feature_flags_by_key should be initialized when feature_flags is set"
363369

364370
def get_feature_variants(
365371
self,
@@ -1169,17 +1175,25 @@ def join(self):
11691175
posthog.join()
11701176
```
11711177
"""
1172-
for consumer in self.consumers:
1173-
consumer.pause()
1174-
try:
1175-
consumer.join()
1176-
except RuntimeError:
1177-
# consumer thread has not started
1178-
pass
1178+
if self.consumers:
1179+
for consumer in self.consumers:
1180+
consumer.pause()
1181+
try:
1182+
consumer.join()
1183+
except RuntimeError:
1184+
# consumer thread has not started
1185+
pass
11791186

11801187
if self.poller:
11811188
self.poller.stop()
11821189

1190+
# Shutdown the cache provider (release locks, cleanup)
1191+
if self._flag_definition_cache_provider:
1192+
try:
1193+
self._flag_definition_cache_provider.shutdown()
1194+
except Exception as e:
1195+
self.log.error(f"[FEATURE FLAGS] Cache provider shutdown error: {e}")
1196+
11831197
def shutdown(self):
11841198
"""
11851199
Flush all messages and cleanly shutdown the client. Call this before the process ends in serverless environments to avoid data loss.
@@ -1195,7 +1209,71 @@ def shutdown(self):
11951209
if self.exception_capture:
11961210
self.exception_capture.close()
11971211

1212+
def _update_flag_state(
1213+
self, data: FlagDefinitionCacheData, old_flags_by_key: Optional[dict] = None
1214+
) -> None:
1215+
"""Update internal flag state from cache data and invalidate evaluation cache if changed."""
1216+
self.feature_flags = data.get("flags") or []
1217+
self.group_type_mapping = data.get("group_type_mapping") or {}
1218+
self.cohorts = data.get("cohorts") or {}
1219+
1220+
# Invalidate evaluation cache if flag definitions changed
1221+
if (
1222+
self.flag_cache
1223+
and old_flags_by_key is not None
1224+
and old_flags_by_key != (self.feature_flags_by_key or {})
1225+
):
1226+
old_version = self.flag_definition_version
1227+
self.flag_definition_version += 1
1228+
self.flag_cache.invalidate_version(old_version)
1229+
11981230
def _load_feature_flags(self):
1231+
should_fetch = True
1232+
if self._flag_definition_cache_provider:
1233+
try:
1234+
should_fetch = (
1235+
self._flag_definition_cache_provider.should_fetch_flag_definitions()
1236+
)
1237+
except Exception as e:
1238+
self.log.error(
1239+
f"[FEATURE FLAGS] Cache provider should_fetch error: {e}"
1240+
)
1241+
# Fail-safe: fetch from API if cache provider errors
1242+
should_fetch = True
1243+
1244+
# If not fetching, try to get from cache
1245+
if not should_fetch and self._flag_definition_cache_provider:
1246+
try:
1247+
cached_data = (
1248+
self._flag_definition_cache_provider.get_flag_definitions()
1249+
)
1250+
if cached_data:
1251+
self.log.debug(
1252+
"[FEATURE FLAGS] Using cached flag definitions from external cache"
1253+
)
1254+
self._update_flag_state(
1255+
cached_data, old_flags_by_key=self.feature_flags_by_key or {}
1256+
)
1257+
self._last_feature_flag_poll = datetime.now(tz=tzutc())
1258+
return
1259+
else:
1260+
# Emergency fallback: if cache is empty and we have no flags, fetch anyway.
1261+
# There's really no other way of recovering in this case.
1262+
if not self.feature_flags:
1263+
self.log.debug(
1264+
"[FEATURE FLAGS] Cache empty and no flags loaded, falling back to API fetch"
1265+
)
1266+
should_fetch = True
1267+
except Exception as e:
1268+
self.log.error(f"[FEATURE FLAGS] Cache provider get error: {e}")
1269+
# Fail-safe: fetch from API if cache provider errors
1270+
should_fetch = True
1271+
1272+
if should_fetch:
1273+
self._fetch_feature_flags_from_api()
1274+
1275+
def _fetch_feature_flags_from_api(self):
1276+
"""Fetch feature flags from the PostHog API."""
11991277
try:
12001278
# Store old flags to detect changes
12011279
old_flags_by_key: dict[str, dict] = self.feature_flags_by_key or {}
@@ -1225,17 +1303,21 @@ def _load_feature_flags(self):
12251303
)
12261304
return
12271305

1228-
self.feature_flags = response.data["flags"] or []
1229-
self.group_type_mapping = response.data["group_type_mapping"] or {}
1230-
self.cohorts = response.data["cohorts"] or {}
1306+
self._update_flag_state(response.data, old_flags_by_key=old_flags_by_key)
12311307

1232-
# Check if flag definitions changed and update version
1233-
if self.flag_cache and old_flags_by_key != (
1234-
self.feature_flags_by_key or {}
1235-
):
1236-
old_version = self.flag_definition_version
1237-
self.flag_definition_version += 1
1238-
self.flag_cache.invalidate_version(old_version)
1308+
# Store in external cache if provider is configured
1309+
if self._flag_definition_cache_provider:
1310+
try:
1311+
self._flag_definition_cache_provider.on_flag_definitions_received(
1312+
{
1313+
"flags": self.feature_flags or [],
1314+
"group_type_mapping": self.group_type_mapping or {},
1315+
"cohorts": self.cohorts or {},
1316+
}
1317+
)
1318+
except Exception as e:
1319+
self.log.error(f"[FEATURE FLAGS] Cache provider store error: {e}")
1320+
# Flags are already in memory, so continue normally
12391321

12401322
except APIError as e:
12411323
if e.status == 401:
@@ -1649,9 +1731,9 @@ def _locally_evaluate_flag(
16491731
response = None
16501732

16511733
if self.feature_flags:
1652-
assert self.feature_flags_by_key is not None, (
1653-
"feature_flags_by_key should be initialized when feature_flags is set"
1654-
)
1734+
assert (
1735+
self.feature_flags_by_key is not None
1736+
), "feature_flags_by_key should be initialized when feature_flags is set"
16551737
# Local evaluation
16561738
flag = self.feature_flags_by_key.get(key)
16571739
if flag:

posthog/flag_definition_cache.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
"""
2+
Flag Definition Cache Provider interface for multi-worker environments.
3+
4+
EXPERIMENTAL: This API may change in future minor version bumps.
5+
6+
This module provides an interface for external caching of feature flag definitions,
7+
enabling multi-worker environments (Kubernetes, load-balanced servers, serverless
8+
functions) to share flag definitions and reduce API calls.
9+
10+
Usage:
11+
12+
from posthog import Posthog
13+
from posthog.flag_definition_cache import FlagDefinitionCacheProvider
14+
15+
cache = RedisFlagDefinitionCache(redis_client, "my-team")
16+
posthog = Posthog(
17+
"<project_api_key>",
18+
personal_api_key="<personal_api_key>",
19+
flag_definition_cache_provider=cache,
20+
)
21+
"""
22+
23+
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
24+
25+
from typing_extensions import Required, TypedDict
26+
27+
28+
class FlagDefinitionCacheData(TypedDict):
29+
"""
30+
Data structure for cached flag definitions.
31+
32+
Attributes:
33+
flags: List of feature flag definition dictionaries from the API.
34+
group_type_mapping: Mapping of group type indices to group names.
35+
cohorts: Dictionary of cohort definitions for local evaluation.
36+
"""
37+
38+
flags: Required[List[Dict[str, Any]]]
39+
group_type_mapping: Required[Dict[str, str]]
40+
cohorts: Required[Dict[str, Any]]
41+
42+
43+
@runtime_checkable
44+
class FlagDefinitionCacheProvider(Protocol):
45+
"""
46+
Interface for external caching of feature flag definitions.
47+
48+
Enables multi-worker environments to share flag definitions, reducing API
49+
calls while ensuring all workers have consistent data.
50+
51+
EXPERIMENTAL: This API may change in future minor version bumps.
52+
53+
The four methods handle the complete lifecycle of flag definition caching:
54+
55+
1. `should_fetch_flag_definitions()` - Called before each poll to determine
56+
if this worker should fetch new definitions. Use for distributed lock
57+
coordination to ensure only one worker fetches at a time.
58+
59+
2. `get_flag_definitions()` - Called when `should_fetch_flag_definitions()`
60+
returns False. Returns cached definitions if available.
61+
62+
3. `on_flag_definitions_received()` - Called after successfully fetching
63+
new definitions from the API. Store the data in your external cache
64+
and release any locks.
65+
66+
4. `shutdown()` - Called when the PostHog client shuts down. Release any
67+
distributed locks and clean up resources.
68+
69+
Error Handling:
70+
All methods are wrapped in try/except. Errors will be logged but will
71+
never break flag evaluation. On error:
72+
- `should_fetch_flag_definitions()` errors default to fetching (fail-safe)
73+
- `get_flag_definitions()` errors fall back to API fetch
74+
- `on_flag_definitions_received()` errors are logged but flags remain in memory
75+
- `shutdown()` errors are logged but shutdown continues
76+
"""
77+
78+
def get_flag_definitions(self) -> Optional[FlagDefinitionCacheData]:
79+
"""
80+
Retrieve cached flag definitions.
81+
82+
Returns:
83+
Cached flag definitions if available and valid, None otherwise.
84+
Returning None will trigger a fetch from the API if this worker
85+
has no flags loaded yet.
86+
"""
87+
...
88+
89+
def should_fetch_flag_definitions(self) -> bool:
90+
"""
91+
Determine whether this instance should fetch new flag definitions.
92+
93+
Use this for distributed lock coordination. Only one worker should
94+
return True to avoid thundering herd problems. A typical implementation
95+
uses a distributed lock (e.g., Redis SETNX) that expires after the
96+
poll interval.
97+
98+
Returns:
99+
True if this instance should fetch from the API, False otherwise.
100+
When False, the client will call `get_flag_definitions()` to
101+
retrieve cached data instead.
102+
"""
103+
...
104+
105+
def on_flag_definitions_received(self, data: FlagDefinitionCacheData) -> None:
106+
"""
107+
Called after successfully receiving new flag definitions from PostHog.
108+
109+
Use this to store the data in your external cache and release any
110+
distributed locks acquired in `should_fetch_flag_definitions()`.
111+
112+
Args:
113+
data: The flag definitions to cache, containing flags,
114+
group_type_mapping, and cohorts.
115+
"""
116+
...
117+
118+
def shutdown(self) -> None:
119+
"""
120+
Called when the PostHog client shuts down.
121+
122+
Use this to release any distributed locks and clean up resources.
123+
This method is called even if `should_fetch_flag_definitions()`
124+
returned False, so implementations should handle the case where
125+
no lock was acquired.
126+
"""
127+
...

0 commit comments

Comments
 (0)