Skip to content
This repository was archived by the owner on Nov 8, 2024. It is now read-only.

Commit e3d0ac8

Browse files
authored
Implement poller and in-memory cache (#3)
* Implement poller and in-memory cache * fix linter errors * adjust error handling of http client * include license in manifest.in * load version from source code instead of file * fix polling bug * use LRU cache instead of TTL cache
1 parent d8fc8ad commit e3d0ac8

16 files changed

+433
-62
lines changed

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
include requirements.txt
22
include README.md
33
include requirements-test.txt
4-
include VERSION.txt
4+
include LICENSE

VERSION.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

eppo_client/__init__.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from typing import Optional
2+
from eppo_client.client import EppoClient
3+
from eppo_client.config import Config
4+
from eppo_client.configuration_requestor import (
5+
ExperimentConfigurationDto,
6+
ExperimentConfigurationRequestor,
7+
)
8+
from eppo_client.configuration_store import ConfigurationStore
9+
from eppo_client.constants import MAX_CACHE_ENTRIES
10+
from eppo_client.http_client import HttpClient, SdkParams
11+
from eppo_client.read_write_lock import ReadWriteLock
12+
13+
__version__ = "0.0.1"
14+
15+
__client: Optional[EppoClient] = None
16+
__lock = ReadWriteLock()
17+
18+
19+
def init(config: Config) -> EppoClient:
20+
"""Initializes a global Eppo client instance
21+
22+
This method should be called once on application startup.
23+
If invoked more than once, it will re-initialize the global client instance.
24+
Use the :func:`eppo_client.get_instance()` method to access the client instance.
25+
26+
:param config: client configuration containing the API Key
27+
:type config: Config
28+
"""
29+
config._validate()
30+
sdk_params = SdkParams(
31+
apiKey=config.api_key, sdkName="python", sdkVersion=__version__
32+
)
33+
http_client = HttpClient(base_url=config.base_url, sdk_params=sdk_params)
34+
config_store: ConfigurationStore[ExperimentConfigurationDto] = ConfigurationStore(
35+
max_size=MAX_CACHE_ENTRIES
36+
)
37+
config_requestor = ExperimentConfigurationRequestor(
38+
http_client=http_client, config_store=config_store
39+
)
40+
global __client
41+
global __lock
42+
try:
43+
__lock.acquire_write()
44+
if __client:
45+
# if a client was already initialized, stop the background processes of the old client
46+
__client._shutdown()
47+
__client = EppoClient(config_requestor=config_requestor)
48+
return __client
49+
finally:
50+
__lock.release_write()
51+
52+
53+
def get_instance() -> EppoClient:
54+
"""Used to access an initialized client instance
55+
56+
Use this method to get a client instance for assigning variants.
57+
This method may only be called after invocation of :func:`eppo_client.init()`, otherwise it throws an exception.
58+
59+
:return: a shared client instance
60+
:rtype: EppoClient
61+
"""
62+
global __client
63+
global __lock
64+
try:
65+
__lock.acquire_read()
66+
if __client:
67+
return __client
68+
else:
69+
raise Exception("init() must be called before get_instance()")
70+
finally:
71+
__lock.release_read()

eppo_client/client.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,21 @@
33
ExperimentConfigurationDto,
44
ExperimentConfigurationRequestor,
55
)
6+
from eppo_client.constants import POLL_INTERVAL_MILLIS, POLL_JITTER_MILLIS
7+
from eppo_client.poller import Poller
68
from eppo_client.shard import get_shard, is_in_shard_range
79
from eppo_client.validation import validate_not_blank
810

911

1012
class EppoClient:
11-
def __init__(self, config_requestor: ExperimentConfigurationRequestor) -> None:
13+
def __init__(self, config_requestor: ExperimentConfigurationRequestor):
1214
self.__config_requestor = config_requestor
15+
self.__poller = Poller(
16+
interval_millis=POLL_INTERVAL_MILLIS,
17+
jitter_millis=POLL_JITTER_MILLIS,
18+
callback=config_requestor.fetch_and_store_configurations,
19+
)
20+
self.__poller.start()
1321

1422
def assign(self, subject: str, experiment_key: str) -> Optional[str]:
1523
"""Maps a subject to a variation for a given experiment
@@ -24,7 +32,7 @@ def assign(self, subject: str, experiment_key: str) -> Optional[str]:
2432
if (
2533
experiment_config is None
2634
or not experiment_config.enabled
27-
or not self.is_in_experiment_sample(
35+
or not self._is_in_experiment_sample(
2836
subject, experiment_key, experiment_config
2937
)
3038
):
@@ -42,7 +50,13 @@ def assign(self, subject: str, experiment_key: str) -> Optional[str]:
4250
None,
4351
)
4452

45-
def is_in_experiment_sample(
53+
def _shutdown(self):
54+
"""Stops all background processes used by the client
55+
Do not use the client after calling this method.
56+
"""
57+
self.__poller.stop()
58+
59+
def _is_in_experiment_sample(
4660
self,
4761
subject: str,
4862
experiment_key: str,
Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
from typing import List, Optional
1+
import logging
2+
from typing import Dict, List, Optional, cast
23
from eppo_client.base_model import SdkBaseModel
4+
from eppo_client.configuration_store import ConfigurationStore
5+
from eppo_client.http_client import HttpClient, HttpRequestError
36

47
from eppo_client.shard import ShardRange
58

9+
logger = logging.getLogger(__name__)
10+
611

712
class VariationDto(SdkBaseModel):
813
name: str
@@ -14,12 +19,40 @@ class ExperimentConfigurationDto(SdkBaseModel):
1419
percent_exposure: float
1520
enabled: bool
1621
variations: List[VariationDto]
17-
name: str
22+
name: Optional[str]
23+
24+
25+
RAC_ENDPOINT = "/randomized_assignment/config"
1826

1927

2028
class ExperimentConfigurationRequestor:
29+
def __init__(
30+
self,
31+
http_client: HttpClient,
32+
config_store: ConfigurationStore[ExperimentConfigurationDto],
33+
):
34+
self.__http_client = http_client
35+
self.__config_store = config_store
36+
2137
def get_configuration(
2238
self, experiment_key: str
2339
) -> Optional[ExperimentConfigurationDto]:
24-
# TODO: implement this method
25-
return None
40+
if self.__http_client.is_unauthorized():
41+
raise ValueError("Unauthorized: please check your API key")
42+
return self.__config_store.get_configuration(experiment_key)
43+
44+
def fetch_and_store_configurations(self) -> Dict[str, ExperimentConfigurationDto]:
45+
try:
46+
configs = cast(
47+
dict, self.__http_client.get(RAC_ENDPOINT).get("experiments", {})
48+
)
49+
for exp_key, exp_config in configs.items():
50+
configs[exp_key] = ExperimentConfigurationDto(**exp_config)
51+
self.__config_store.set_configurations(configs)
52+
return configs
53+
except HttpRequestError as e:
54+
logger.error("Error retrieving assignment configurations: " + str(e))
55+
if e.is_recoverable():
56+
return {}
57+
else:
58+
raise e # caught by the polling task; causes assignment polling to stop

eppo_client/configuration_store.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from typing import Dict, Optional, TypeVar, Generic
2+
from cachetools import LRUCache
3+
4+
from eppo_client.read_write_lock import ReadWriteLock
5+
6+
T = TypeVar("T")
7+
8+
9+
class ConfigurationStore(Generic[T]):
10+
def __init__(self, max_size: int):
11+
self.__cache: LRUCache = LRUCache(maxsize=max_size)
12+
self.__lock = ReadWriteLock()
13+
14+
def get_configuration(self, key: str) -> Optional[T]:
15+
try:
16+
self.__lock.acquire_read()
17+
return self.__cache[key]
18+
except KeyError:
19+
return None # key does not exist
20+
finally:
21+
self.__lock.release_read()
22+
23+
def set_configurations(self, configs: Dict[str, T]):
24+
try:
25+
self.__lock.acquire_write()
26+
for key, config in configs.items():
27+
self.__cache[key] = config
28+
finally:
29+
self.__lock.release_write()

eppo_client/constants.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# configuration cache
2+
MAX_CACHE_ENTRIES = 1000 # arbitrary; the caching library requires a max limit
3+
4+
# poller
5+
SECOND_MILLIS = 1000
6+
MINUTE_MILLIS = 60 * SECOND_MILLIS
7+
POLL_JITTER_MILLIS = 30 * SECOND_MILLIS
8+
POLL_INTERVAL_MILLIS = 5 * MINUTE_MILLIS

eppo_client/http_client.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from typing import Any
2+
from requests.exceptions import Timeout
3+
from requests.adapters import HTTPAdapter, Retry
4+
from http import HTTPStatus
5+
6+
import requests
7+
8+
from eppo_client.base_model import SdkBaseModel
9+
10+
11+
class SdkParams(SdkBaseModel):
12+
# attributes are camelCase because that's what the backend endpoint expects
13+
apiKey: str
14+
sdkName: str
15+
sdkVersion: str
16+
17+
18+
class HttpRequestError(Exception):
19+
def __init__(self, message: str, status_code: int):
20+
self.status_code = status_code
21+
super().__init__(message)
22+
23+
def is_recoverable(self) -> bool:
24+
if self.status_code >= 400 and self.status_code < 500:
25+
return (
26+
self.status_code == HTTPStatus.TOO_MANY_REQUESTS
27+
or self.status_code == HTTPStatus.REQUEST_TIMEOUT
28+
)
29+
return True
30+
31+
32+
REQUEST_TIMEOUT_SECONDS = 2
33+
# Retry reference: https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#module-urllib3.util.retry
34+
# This applies only to failed DNS lookups and connection timeouts,
35+
# never to requests where data has made it to the server.
36+
MAX_RETRIES = Retry(total=3, backoff_factor=1)
37+
38+
39+
class HttpClient:
40+
def __init__(self, base_url: str, sdk_params: SdkParams):
41+
self.__base_url = base_url
42+
self.__sdk_params = sdk_params
43+
self.__session = requests.Session()
44+
self.__session.mount("https://", HTTPAdapter(max_retries=MAX_RETRIES))
45+
self.__is_unauthorized = False
46+
47+
def is_unauthorized(self) -> bool:
48+
return self.__is_unauthorized
49+
50+
def get(self, resource: str) -> Any:
51+
try:
52+
response = self.__session.get(
53+
self.__base_url + resource,
54+
params=self.__sdk_params.dict(),
55+
timeout=REQUEST_TIMEOUT_SECONDS,
56+
)
57+
self.__is_unauthorized = response.status_code == HTTPStatus.UNAUTHORIZED
58+
if response.status_code != HTTPStatus.OK:
59+
raise self._get_http_error(response.status_code, resource)
60+
return response.json()
61+
except Timeout:
62+
raise self._get_http_error(HTTPStatus.REQUEST_TIMEOUT, resource)
63+
64+
def _get_http_error(self, status_code: int, resource: str) -> HttpRequestError:
65+
return HttpRequestError(
66+
"HTTP {} error while requesting resource {}".format(status_code, resource),
67+
status_code=status_code,
68+
)

eppo_client/poller.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import logging
2+
from multiprocessing import Event
3+
from random import randrange
4+
from threading import Thread
5+
from typing import Callable
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class Poller:
11+
def __init__(self, interval_millis: int, jitter_millis: int, callback: Callable):
12+
self.__jitter_millis = jitter_millis
13+
self.__interval = interval_millis
14+
self.__stop_event = Event()
15+
self.__callback = callback
16+
self.__thread = Thread(target=self.poll, daemon=True)
17+
18+
def start(self):
19+
self.__thread.start()
20+
21+
def stop(self):
22+
self.__stop_event.set()
23+
24+
def is_stopped(self):
25+
return self.__stop_event.is_set()
26+
27+
def poll(self):
28+
while not self.is_stopped():
29+
try:
30+
self.__callback()
31+
except Exception as e:
32+
logger.error("Unexpected error running poll task: " + str(e))
33+
break
34+
self._wait_for_interval()
35+
36+
def _wait_for_interval(self):
37+
interval_with_jitter = self.__interval - randrange(0, self.__jitter_millis)
38+
self.__stop_event.wait(interval_with_jitter / 1000)

eppo_client/read_write_lock.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import threading
2+
3+
# Copied from: https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s04.html
4+
5+
6+
class ReadWriteLock:
7+
"""A lock object that allows many simultaneous "read locks", but
8+
only one "write lock." """
9+
10+
def __init__(self):
11+
self._read_ready = threading.Condition(threading.Lock())
12+
self._readers = 0
13+
14+
def acquire_read(self):
15+
"""Acquire a read lock. Blocks only if a thread has
16+
acquired the write lock."""
17+
self._read_ready.acquire()
18+
try:
19+
self._readers += 1
20+
finally:
21+
self._read_ready.release()
22+
23+
def release_read(self):
24+
"""Release a read lock."""
25+
self._read_ready.acquire()
26+
try:
27+
self._readers -= 1
28+
if not self._readers:
29+
self._read_ready.notifyAll()
30+
finally:
31+
self._read_ready.release()
32+
33+
def acquire_write(self):
34+
"""Acquire a write lock. Blocks until there are no
35+
acquired read or write locks."""
36+
self._read_ready.acquire()
37+
while self._readers > 0:
38+
self._read_ready.wait()
39+
40+
def release_write(self):
41+
"""Release a write lock."""
42+
self._read_ready.release()

0 commit comments

Comments
 (0)