Skip to content

Commit d98b58a

Browse files
authored
feat: SSE (#75)
* sse init * init sse work * sse wip * sse refetch config * sse refetch config and fix delay * update requirements * lint * update versions used in testing * fix import * Use optional * ignore ldeventsource in mypy * Fix callable args * Fix callable args * reformat
1 parent bfdba6a commit d98b58a

File tree

10 files changed

+133
-14
lines changed

10 files changed

+133
-14
lines changed

.github/workflows/lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
- name: Set up Python
1515
uses: actions/setup-python@v5
1616
with:
17-
python-version: 3.8
17+
python-version: 3.12
1818
cache: 'pip'
1919

2020
- name: Install dependencies

.github/workflows/release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
runs-on: ubuntu-latest
3232
strategy:
3333
matrix:
34-
python-version: [ "3.8" ]
34+
python-version: [ "3.12" ]
3535

3636
steps:
3737
# Check out the repo with credentials that can bypass branch protection, and fetch git history instead of just latest commit

.github/workflows/test_examples.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
runs-on: ubuntu-latest
99
strategy:
1010
matrix:
11-
python-version: [ "3.8", "3.12" ]
11+
python-version: [ "3.12" ]
1212

1313
steps:
1414
- uses: actions/checkout@v4

.github/workflows/unit_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
runs-on: ${{matrix.os}}
99
strategy:
1010
matrix:
11-
python-version: [ "3.8", "3.12" ]
11+
python-version: [ "3.12" ]
1212
os: [ubuntu-latest, windows-latest]
1313

1414
steps:

devcycle_python_sdk/managers/config_manager.py

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1+
import json
2+
import logging
13
import threading
24
import time
3-
import logging
4-
import json
5+
from datetime import datetime
56
from typing import Optional
67

7-
from devcycle_python_sdk.options import DevCycleLocalOptions
8-
from devcycle_python_sdk.api.local_bucketing import LocalBucketing
8+
import ld_eventsource.actions
9+
910
from devcycle_python_sdk.api.config_client import ConfigAPIClient
11+
from devcycle_python_sdk.api.local_bucketing import LocalBucketing
1012
from devcycle_python_sdk.exceptions import (
1113
CloudClientUnauthorizedError,
1214
CloudClientError,
1315
)
16+
from wsgiref.handlers import format_date_time
17+
from devcycle_python_sdk.options import DevCycleLocalOptions
18+
from devcycle_python_sdk.managers.sse_manager import SSEManager
1419

1520
logger = logging.getLogger(__name__)
1621

@@ -27,7 +32,9 @@ def __init__(
2732
self._sdk_key = sdk_key
2833
self._options = options
2934
self._local_bucketing = local_bucketing
30-
35+
self._sse_manager: Optional[SSEManager] = None
36+
self._sse_polling_interval = 1000 * 60 * 15 * 60
37+
self._sse_connected = False
3138
self._config: Optional[dict] = None
3239
self._config_etag: Optional[str] = None
3340
self._config_lastmodified: Optional[str] = None
@@ -41,10 +48,15 @@ def __init__(
4148
def is_initialized(self) -> bool:
4249
return self._config is not None
4350

44-
def _get_config(self):
51+
def _get_config(self, last_modified: Optional[float] = None):
4552
try:
53+
lm_header = self._config_lastmodified
54+
if last_modified is not None:
55+
lm_timestamp = datetime.fromtimestamp(last_modified)
56+
lm_header = format_date_time(time.mktime(lm_timestamp.timetuple()))
57+
4658
new_config, new_etag, new_lastmodified = self._config_api_client.get_config(
47-
config_etag=self._config_etag, last_modified=self._config_lastmodified
59+
config_etag=self._config_etag, last_modified=lm_header
4860
)
4961

5062
# Abort early if the last modified is before the sent one.
@@ -68,6 +80,14 @@ def _get_config(self):
6880

6981
json_config = json.dumps(self._config)
7082
self._local_bucketing.store_config(json_config)
83+
if self._options.enable_beta_realtime_updates:
84+
if self._sse_manager is None:
85+
self._sse_manager = SSEManager(
86+
self.sse_state,
87+
self.sse_error,
88+
self.sse_message,
89+
)
90+
self._sse_manager.update(self._config)
7191

7292
if (
7393
trigger_on_client_initialized
@@ -98,7 +118,32 @@ def run(self):
98118
logger.warning(
99119
f"DevCycle: Error polling for config changes: {str(e)}"
100120
)
101-
time.sleep(self._options.config_polling_interval_ms / 1000.0)
121+
if self._sse_connected:
122+
time.sleep(self._sse_polling_interval / 1000.0)
123+
else:
124+
time.sleep(self._options.config_polling_interval_ms / 1000.0)
125+
126+
def sse_message(self, message: ld_eventsource.actions.Event):
127+
if self._sse_connected is False:
128+
self._sse_connected = True
129+
logger.info("DevCycle: Connected to SSE stream")
130+
logger.info(f"DevCycle: Received message: {message.data}")
131+
sse_message = json.loads(message.data)
132+
dvc_data = json.loads(sse_message.get("data"))
133+
if (
134+
dvc_data.get("type") == "refetchConfig"
135+
or dvc_data.get("type") == ""
136+
or dvc_data.get("type") is None
137+
):
138+
logger.info("DevCycle: Received refetchConfig message - updating config")
139+
self._get_config(dvc_data["lastModified"])
140+
141+
def sse_error(self, error: ld_eventsource.actions.Fault):
142+
logger.warning(f"DevCycle: Received SSE error: {error}")
143+
144+
def sse_state(self, state: ld_eventsource.actions.Start):
145+
self._sse_connected = True
146+
logger.info("DevCycle: Connected to SSE stream")
102147

103148
def close(self):
104149
self._polling_enabled = False
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import threading
2+
3+
import ld_eventsource
4+
import ld_eventsource.actions
5+
import ld_eventsource.config
6+
from typing import Callable
7+
8+
9+
class SSEManager:
10+
def __init__(
11+
self,
12+
handlestate: Callable[[ld_eventsource.actions.Start], None],
13+
handleerror: Callable[[ld_eventsource.actions.Fault], None],
14+
handlemessage: Callable[[ld_eventsource.actions.Event], None],
15+
):
16+
self.client: ld_eventsource.SSEClient = None
17+
self.url = ""
18+
self.handlestate = handlestate
19+
self.handleerror = handleerror
20+
self.handlemessage = handlemessage
21+
22+
self.read_thread = threading.Thread(
23+
target=self.read_events,
24+
args=(self.handlestate, self.handleerror, self.handlemessage),
25+
)
26+
27+
def read_events(
28+
self,
29+
handlestate: Callable[[ld_eventsource.actions.Start], None],
30+
handleerror: Callable[[ld_eventsource.actions.Fault], None],
31+
handlemessage: Callable[[ld_eventsource.actions.Event], None],
32+
):
33+
self.client.start()
34+
for event in self.client.all:
35+
if isinstance(event, ld_eventsource.actions.Start):
36+
handlestate(event)
37+
elif isinstance(event, ld_eventsource.actions.Fault):
38+
handleerror(event)
39+
elif isinstance(event, ld_eventsource.actions.Event):
40+
handlemessage(event)
41+
42+
def update(self, config: dict):
43+
if self.use_new_config(config["sse"]):
44+
self.url = config["sse"]["hostname"] + config["sse"]["path"]
45+
if self.client is not None:
46+
self.client.close()
47+
if self.read_thread.is_alive():
48+
self.read_thread.join()
49+
self.client = ld_eventsource.SSEClient(
50+
connect=ld_eventsource.config.ConnectStrategy.http(self.url),
51+
error_strategy=ld_eventsource.config.ErrorStrategy.CONTINUE,
52+
)
53+
self.read_thread = threading.Thread(
54+
target=self.read_events,
55+
args=(self.handlestate, self.handleerror, self.handlemessage),
56+
)
57+
self.read_thread.start()
58+
59+
def use_new_config(self, config: dict) -> bool:
60+
new_url = config["hostname"] + config["path"]
61+
if self.url == "" or self.url is None and new_url != "":
62+
return True
63+
return self.url != new_url

devcycle_python_sdk/options.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
event_retry_delay_ms: int = 200, # milliseconds
4646
disable_automatic_event_logging: bool = False,
4747
disable_custom_event_logging: bool = False,
48+
enable_beta_realtime_updates: bool = False,
4849
):
4950
self.events_api_uri = events_api_uri
5051
self.config_cdn_uri = config_cdn_uri
@@ -60,6 +61,7 @@ def __init__(
6061
self.on_client_initialized = on_client_initialized
6162
self.event_request_timeout_ms = event_request_timeout_ms
6263
self.event_retry_delay_ms = event_retry_delay_ms
64+
self.enable_beta_realtime_updates = enable_beta_realtime_updates
6365

6466
if self.flush_event_queue_size >= self.max_event_queue_size:
6567
logger.warning(

example/local_bucketing_client_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def main():
2121

2222
# create an instance of the DevCycle Client object
2323
server_sdk_key = os.environ["DEVCYCLE_SERVER_SDK_KEY"]
24-
options = DevCycleLocalOptions()
24+
options = DevCycleLocalOptions(enable_beta_realtime_updates=True)
2525
client = DevCycleLocalClient(server_sdk_key, options)
2626

2727
# Wait for DevCycle to initialize and load the configuration

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ ignore_missing_imports = true
2828
module = 'test.openfeature.*'
2929
ignore_errors = true
3030

31+
[[tool.mypy.overrides]]
32+
module = 'ld_eventsource.*'
33+
ignore_errors = true
34+
ignore_missing_imports = true
35+
36+
3137
[[tool.mypy.overrides]]
3238
module = 'openfeature.*'
3339
ignore_errors = true

requirements.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,7 @@ urllib3 >= 1.15.1
33
requests ~= 2.31
44
wasmtime == 23.0.0
55
protobuf >= 4.23.3
6-
openfeature-sdk >= 0.7.0
6+
openfeature-sdk >= 0.7.0
7+
launchdarkly-eventsource >= 1.2.0
8+
responses~=0.23.1
9+
dataclasses~=0.6

0 commit comments

Comments
 (0)