Skip to content

Commit 09b9e46

Browse files
feat: Add real-time feature flags support via SSE
1 parent b6dbff1 commit 09b9e46

File tree

3 files changed

+665
-0
lines changed

3 files changed

+665
-0
lines changed

posthog/client.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ def __init__(
192192
capture_exception_code_variables=False,
193193
code_variables_mask_patterns=None,
194194
code_variables_ignore_patterns=None,
195+
realtime_flags=False,
196+
on_feature_flags_update=None,
195197
):
196198
"""
197199
Initialize a new PostHog client instance.
@@ -248,6 +250,10 @@ def __init__(
248250
self.exception_capture = None
249251
self.privacy_mode = privacy_mode
250252
self.enable_local_evaluation = enable_local_evaluation
253+
self.realtime_flags = realtime_flags
254+
self.on_feature_flags_update = on_feature_flags_update
255+
self.sse_connection = None # type: Optional[Any]
256+
self.sse_connected = False
251257

252258
self.capture_exception_code_variables = capture_exception_code_variables
253259
self.code_variables_mask_patterns = (
@@ -1190,6 +1196,10 @@ def join(self):
11901196
except Exception as e:
11911197
self.log.error(f"[FEATURE FLAGS] Cache provider shutdown error: {e}")
11921198

1199+
# Close SSE connection
1200+
if self.sse_connection:
1201+
self._close_sse_connection()
1202+
11931203
def shutdown(self):
11941204
"""
11951205
Flush all messages and cleanly shutdown the client. Call this before the process ends in serverless environments to avoid data loss.
@@ -1315,6 +1325,10 @@ def _fetch_feature_flags_from_api(self):
13151325
self.log.error(f"[FEATURE FLAGS] Cache provider store error: {e}")
13161326
# Flags are already in memory, so continue normally
13171327

1328+
# Setup SSE connection if realtime_flags is enabled
1329+
if self.realtime_flags and not self.sse_connected:
1330+
self._setup_sse_connection()
1331+
13181332
except APIError as e:
13191333
if e.status == 401:
13201334
self.log.error(
@@ -2220,6 +2234,180 @@ def _add_local_person_and_group_properties(
22202234

22212235
return all_person_properties, all_group_properties
22222236

2237+
def _setup_sse_connection(self):
2238+
"""
2239+
Establish a real-time connection using Server-Sent Events to receive feature flag updates.
2240+
"""
2241+
if not self.personal_api_key:
2242+
self.log.warning(
2243+
"[FEATURE FLAGS] Cannot establish real-time connection without personal_api_key"
2244+
)
2245+
return
2246+
2247+
if self.sse_connected:
2248+
self.log.debug("[FEATURE FLAGS] SSE connection already established")
2249+
return
2250+
2251+
try:
2252+
import threading
2253+
import json
2254+
2255+
# Use requests with stream=True for SSE
2256+
import requests
2257+
2258+
url = f"{self.host}/flags/definitions/stream?api_key={self.api_key}"
2259+
headers = {
2260+
"Authorization": f"Bearer {self.personal_api_key}",
2261+
"Accept": "text/event-stream",
2262+
}
2263+
2264+
def sse_listener():
2265+
"""Background thread to listen for SSE messages"""
2266+
try:
2267+
with requests.get(
2268+
url, headers=headers, stream=True, timeout=None
2269+
) as response:
2270+
if response.status_code != 200:
2271+
self.log.warning(
2272+
f"[FEATURE FLAGS] SSE connection failed with status {response.status_code}"
2273+
)
2274+
self.sse_connected = False
2275+
return
2276+
2277+
self.sse_connected = True
2278+
self.log.debug("[FEATURE FLAGS] SSE connection established")
2279+
2280+
# Process the stream line by line
2281+
for line in response.iter_lines():
2282+
if not line:
2283+
continue
2284+
2285+
line = line.decode("utf-8")
2286+
2287+
# SSE format: "data: {...}"
2288+
if line.startswith("data: "):
2289+
data_str = line[6:] # Remove "data: " prefix
2290+
try:
2291+
flag_data = json.loads(data_str)
2292+
self._process_flag_update(flag_data)
2293+
except json.JSONDecodeError as e:
2294+
self.log.warning(
2295+
f"[FEATURE FLAGS] Failed to parse SSE message: {e}"
2296+
)
2297+
except Exception as e:
2298+
self.log.warning(
2299+
f"[FEATURE FLAGS] SSE connection error: {e}. Reconnecting in 5 seconds..."
2300+
)
2301+
self.sse_connected = False
2302+
2303+
# Attempt to reconnect after 5 seconds if realtime_flags is still enabled
2304+
if self.realtime_flags:
2305+
import time
2306+
2307+
time.sleep(5)
2308+
self._setup_sse_connection()
2309+
2310+
# Start the SSE listener in a daemon thread
2311+
sse_thread = threading.Thread(target=sse_listener, daemon=True)
2312+
sse_thread.start()
2313+
self.sse_connection = sse_thread
2314+
2315+
except ImportError:
2316+
self.log.warning(
2317+
"[FEATURE FLAGS] requests library required for real-time flags"
2318+
)
2319+
except Exception as e:
2320+
self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}")
2321+
2322+
def _close_sse_connection(self):
2323+
"""
2324+
Close the active SSE connection.
2325+
"""
2326+
if self.sse_connection:
2327+
self.log.debug("[FEATURE FLAGS] Closing SSE connection")
2328+
# Note: We can't directly stop the thread, but setting sse_connected to False
2329+
# will prevent reconnection attempts
2330+
self.sse_connected = False
2331+
self.sse_connection = None
2332+
2333+
def _process_flag_update(self, flag_data):
2334+
"""
2335+
Process incoming flag updates from SSE messages.
2336+
2337+
Args:
2338+
flag_data: The flag data from the SSE message
2339+
"""
2340+
try:
2341+
flag_key = flag_data.get("key")
2342+
if not flag_key:
2343+
self.log.warning("[FEATURE FLAGS] Received flag update without key")
2344+
return
2345+
2346+
is_deleted = flag_data.get("deleted", False)
2347+
2348+
# Handle flag deletion
2349+
if is_deleted:
2350+
self.log.debug(f"[FEATURE FLAGS] Deleting flag: {flag_key}")
2351+
if self.feature_flags_by_key and flag_key in self.feature_flags_by_key:
2352+
del self.feature_flags_by_key[flag_key]
2353+
2354+
# Also remove from the array
2355+
if self.feature_flags:
2356+
self.feature_flags = [
2357+
f for f in self.feature_flags if f.get("key") != flag_key
2358+
]
2359+
2360+
# Invalidate cache for this flag
2361+
if self.flag_cache:
2362+
old_version = self.flag_definition_version
2363+
self.flag_definition_version += 1
2364+
self.flag_cache.invalidate_version(old_version)
2365+
2366+
else:
2367+
# Update or add flag
2368+
self.log.debug(f"[FEATURE FLAGS] Updating flag: {flag_key}")
2369+
2370+
if self.feature_flags_by_key is None:
2371+
self.feature_flags_by_key = {}
2372+
2373+
if self.feature_flags is None:
2374+
self.feature_flags = []
2375+
2376+
# Update the lookup table
2377+
self.feature_flags_by_key[flag_key] = flag_data
2378+
2379+
# Update or add to the array
2380+
flag_exists = False
2381+
for i, f in enumerate(self.feature_flags):
2382+
if f.get("key") == flag_key:
2383+
self.feature_flags[i] = flag_data
2384+
flag_exists = True
2385+
break
2386+
2387+
if not flag_exists:
2388+
self.feature_flags.append(flag_data)
2389+
2390+
# Invalidate cache when flag definitions change
2391+
if self.flag_cache:
2392+
old_version = self.flag_definition_version
2393+
self.flag_definition_version += 1
2394+
self.flag_cache.invalidate_version(old_version)
2395+
2396+
# Call the user's callback if provided
2397+
if self.on_feature_flags_update:
2398+
try:
2399+
self.on_feature_flags_update(
2400+
flag_key=flag_key,
2401+
flag_data=flag_data,
2402+
)
2403+
except Exception as e:
2404+
self.log.exception(
2405+
f"[FEATURE FLAGS] Error in on_feature_flags_update callback: {e}"
2406+
)
2407+
2408+
except Exception as e:
2409+
self.log.exception(f"[FEATURE FLAGS] Error processing flag update: {e}")
2410+
22232411

22242412
def stringify_id(val):
22252413
if val is None:

0 commit comments

Comments
 (0)