Skip to content

Commit 5bda502

Browse files
committed
Updated sync and api classes
1 parent 93a9fdb commit 5bda502

File tree

6 files changed

+417
-193
lines changed

6 files changed

+417
-193
lines changed

splitio/api/commons.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def record_telemetry(status_code, elapsed, metric_name, telemetry_runtime_produc
5757
class FetchOptions(object):
5858
"""Fetch Options object."""
5959

60-
def __init__(self, cache_control_headers=False, change_number=None, sets=None, spec=SPEC_VERSION):
60+
def __init__(self, cache_control_headers=False, change_number=None, rbs_change_number=None, sets=None, spec=SPEC_VERSION):
6161
"""
6262
Class constructor.
6363
@@ -72,6 +72,7 @@ def __init__(self, cache_control_headers=False, change_number=None, sets=None, s
7272
"""
7373
self._cache_control_headers = cache_control_headers
7474
self._change_number = change_number
75+
self._rbs_change_number = rbs_change_number
7576
self._sets = sets
7677
self._spec = spec
7778

@@ -85,6 +86,11 @@ def change_number(self):
8586
"""Return change number."""
8687
return self._change_number
8788

89+
@property
90+
def rbs_change_number(self):
91+
"""Return change number."""
92+
return self._rbs_change_number
93+
8894
@property
8995
def sets(self):
9096
"""Return sets."""
@@ -103,14 +109,19 @@ def __eq__(self, other):
103109
if self._change_number != other._change_number:
104110
return False
105111

112+
if self._rbs_change_number != other._rbs_change_number:
113+
return False
114+
106115
if self._sets != other._sets:
107116
return False
117+
108118
if self._spec != other._spec:
109119
return False
120+
110121
return True
111122

112123

113-
def build_fetch(change_number, fetch_options, metadata):
124+
def build_fetch(change_number, fetch_options, metadata, rbs_change_number=None):
114125
"""
115126
Build fetch with new flags if that is the case.
116127
@@ -123,11 +134,16 @@ def build_fetch(change_number, fetch_options, metadata):
123134
:param metadata: Metadata Headers.
124135
:type metadata: dict
125136
137+
:param rbs_change_number: Last known timestamp of a rule based segment modification.
138+
:type rbs_change_number: int
139+
126140
:return: Objects for fetch
127141
:rtype: dict, dict
128142
"""
129143
query = {'s': fetch_options.spec} if fetch_options.spec is not None else {}
130144
query['since'] = change_number
145+
if rbs_change_number is not None:
146+
query['rbSince'] = rbs_change_number
131147
extra_headers = metadata
132148
if fetch_options is None:
133149
return query, extra_headers

splitio/api/splits.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,24 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
3131
self._telemetry_runtime_producer = telemetry_runtime_producer
3232
self._client.set_telemetry_data(HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)
3333

34-
def fetch_splits(self, change_number, fetch_options):
34+
def fetch_splits(self, change_number, rbs_change_number, fetch_options):
3535
"""
3636
Fetch feature flags from backend.
3737
3838
:param change_number: Last known timestamp of a split modification.
3939
:type change_number: int
4040
41+
:param rbs_change_number: Last known timestamp of a rule based segment modification.
42+
:type rbs_change_number: int
43+
4144
:param fetch_options: Fetch options for getting feature flag definitions.
4245
:type fetch_options: splitio.api.commons.FetchOptions
4346
4447
:return: Json representation of a splitChanges response.
4548
:rtype: dict
4649
"""
4750
try:
48-
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
51+
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
4952
response = self._client.get(
5053
'sdk',
5154
'splitChanges',
@@ -86,12 +89,15 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
8689
self._telemetry_runtime_producer = telemetry_runtime_producer
8790
self._client.set_telemetry_data(HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)
8891

89-
async def fetch_splits(self, change_number, fetch_options):
92+
async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
9093
"""
9194
Fetch feature flags from backend.
9295
9396
:param change_number: Last known timestamp of a split modification.
9497
:type change_number: int
98+
99+
:param rbs_change_number: Last known timestamp of a rule based segment modification.
100+
:type rbs_change_number: int
95101
96102
:param fetch_options: Fetch options for getting feature flag definitions.
97103
:type fetch_options: splitio.api.commons.FetchOptions
@@ -100,7 +106,7 @@ async def fetch_splits(self, change_number, fetch_options):
100106
:rtype: dict
101107
"""
102108
try:
103-
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
109+
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
104110
response = await self._client.get(
105111
'sdk',
106112
'splitChanges',

splitio/sync/split.py

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
from splitio.api import APIException, APIUriException
1111
from splitio.api.commons import FetchOptions
1212
from splitio.client.input_validator import validate_flag_sets
13-
from splitio.models import splits
13+
from splitio.models import splits, rule_based_segments
1414
from splitio.util.backoff import Backoff
1515
from splitio.util.time import get_current_epoch_time_ms
16-
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
16+
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async, \
17+
update_rule_based_segment_storage, update_rule_based_segment_storage_async
1718
from splitio.sync import util
1819
from splitio.optional.loaders import asyncio, aiofiles
1920

@@ -32,7 +33,7 @@
3233
class SplitSynchronizerBase(object):
3334
"""Feature Flag changes synchronizer."""
3435

35-
def __init__(self, feature_flag_api, feature_flag_storage):
36+
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
3637
"""
3738
Class constructor.
3839
@@ -44,6 +45,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
4445
"""
4546
self._api = feature_flag_api
4647
self._feature_flag_storage = feature_flag_storage
48+
self._rule_based_segment_storage = rule_based_segment_storage
4749
self._backoff = Backoff(
4850
_ON_DEMAND_FETCH_BACKOFF_BASE,
4951
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
@@ -53,6 +55,11 @@ def feature_flag_storage(self):
5355
"""Return Feature_flag storage object"""
5456
return self._feature_flag_storage
5557

58+
@property
59+
def rule_based_segment_storage(self):
60+
"""Return rule base segment storage object"""
61+
return self._rule_based_segment_storage
62+
5663
def _get_config_sets(self):
5764
"""
5865
Get all filter flag sets cnverrted to string, if no filter flagsets exist return None
@@ -67,7 +74,7 @@ def _get_config_sets(self):
6774
class SplitSynchronizer(SplitSynchronizerBase):
6875
"""Feature Flag changes synchronizer."""
6976

70-
def __init__(self, feature_flag_api, feature_flag_storage):
77+
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
7178
"""
7279
Class constructor.
7380
@@ -77,7 +84,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
7784
:param feature_flag_storage: Feature Flag Storage.
7885
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
7986
"""
80-
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage)
87+
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)
8188

8289
def _fetch_until(self, fetch_options, till=None):
8390
"""
@@ -97,12 +104,17 @@ def _fetch_until(self, fetch_options, till=None):
97104
change_number = self._feature_flag_storage.get_change_number()
98105
if change_number is None:
99106
change_number = -1
100-
if till is not None and till < change_number:
107+
108+
rbs_change_number = self._rule_based_segment_storage.get_change_number()
109+
if rbs_change_number is None:
110+
rbs_change_number = -1
111+
112+
if till is not None and till < change_number and till < rbs_change_number:
101113
# the passed till is less than change_number, no need to perform updates
102-
return change_number, segment_list
114+
return change_number, rbs_change_number, segment_list
103115

104116
try:
105-
feature_flag_changes = self._api.fetch_splits(change_number, fetch_options)
117+
feature_flag_changes = self._api.fetch_splits(change_number, rbs_change_number, fetch_options)
106118
except APIException as exc:
107119
if exc._status_code is not None and exc._status_code == 414:
108120
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
@@ -112,15 +124,16 @@ def _fetch_until(self, fetch_options, till=None):
112124
_LOGGER.error('Exception raised while fetching feature flags')
113125
_LOGGER.debug('Exception information: ', exc_info=True)
114126
raise exc
115-
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
116-
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
117-
if feature_flag_changes['till'] == feature_flag_changes['since']:
118-
return feature_flag_changes['till'], segment_list
119-
120-
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
121-
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
122-
if feature_flag_changes['till'] == feature_flag_changes['since']:
123-
return feature_flag_changes['till'], segment_list
127+
128+
fetched_rule_based_segments = [(rule_based_segments.from_raw(rule_based_segment)) for rule_based_segment in feature_flag_changes.get('rbs').get('d', [])]
129+
rbs_segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'])
130+
131+
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
132+
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'])
133+
segment_list.update(rbs_segment_list)
134+
135+
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
136+
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list
124137

125138
def _attempt_feature_flag_sync(self, fetch_options, till=None):
126139
"""
@@ -140,13 +153,13 @@ def _attempt_feature_flag_sync(self, fetch_options, till=None):
140153
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
141154
while True:
142155
remaining_attempts -= 1
143-
change_number, segment_list = self._fetch_until(fetch_options, till)
156+
change_number, rbs_change_number, segment_list = self._fetch_until(fetch_options, till)
144157
final_segment_list.update(segment_list)
145-
if till is None or till <= change_number:
146-
return True, remaining_attempts, change_number, final_segment_list
158+
if till is None or (till <= change_number and till <= rbs_change_number):
159+
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list
147160

148161
elif remaining_attempts <= 0:
149-
return False, remaining_attempts, change_number, final_segment_list
162+
return False, remaining_attempts, change_number, rbs_change_number, final_segment_list
150163

151164
how_long = self._backoff.get()
152165
time.sleep(how_long)
@@ -172,16 +185,16 @@ def synchronize_splits(self, till=None):
172185
"""
173186
final_segment_list = set()
174187
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
175-
successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
188+
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
176189
till)
177190
final_segment_list.update(segment_list)
178191
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
179192
if successful_sync: # succedeed sync
180193
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
181194
return final_segment_list
182195

183-
with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
184-
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
196+
with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
197+
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
185198
final_segment_list.update(segment_list)
186199
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
187200
if without_cdn_successful_sync:
@@ -208,7 +221,7 @@ def kill_split(self, feature_flag_name, default_treatment, change_number):
208221
class SplitSynchronizerAsync(SplitSynchronizerBase):
209222
"""Feature Flag changes synchronizer async."""
210223

211-
def __init__(self, feature_flag_api, feature_flag_storage):
224+
def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage):
212225
"""
213226
Class constructor.
214227
@@ -218,7 +231,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
218231
:param feature_flag_storage: Feature Flag Storage.
219232
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
220233
"""
221-
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage)
234+
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)
222235

223236
async def _fetch_until(self, fetch_options, till=None):
224237
"""
@@ -238,12 +251,17 @@ async def _fetch_until(self, fetch_options, till=None):
238251
change_number = await self._feature_flag_storage.get_change_number()
239252
if change_number is None:
240253
change_number = -1
241-
if till is not None and till < change_number:
254+
255+
rbs_change_number = await self._rule_based_segment_storage.get_change_number()
256+
if rbs_change_number is None:
257+
rbs_change_number = -1
258+
259+
if till is not None and till < change_number and till < rbs_change_number:
242260
# the passed till is less than change_number, no need to perform updates
243-
return change_number, segment_list
261+
return change_number, rbs_change_number, segment_list
244262

245263
try:
246-
feature_flag_changes = await self._api.fetch_splits(change_number, fetch_options)
264+
feature_flag_changes = await self._api.fetch_splits(change_number, rbs_change_number, fetch_options)
247265
except APIException as exc:
248266
if exc._status_code is not None and exc._status_code == 414:
249267
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
@@ -254,10 +272,15 @@ async def _fetch_until(self, fetch_options, till=None):
254272
_LOGGER.debug('Exception information: ', exc_info=True)
255273
raise exc
256274

257-
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
258-
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
259-
if feature_flag_changes['till'] == feature_flag_changes['since']:
260-
return feature_flag_changes['till'], segment_list
275+
fetched_rule_based_segments = [(rule_based_segments.from_raw(rule_based_segment)) for rule_based_segment in feature_flag_changes.get('rbs').get('d', [])]
276+
rbs_segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'])
277+
278+
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
279+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'])
280+
segment_list.update(rbs_segment_list)
281+
282+
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
283+
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list
261284

262285
async def _attempt_feature_flag_sync(self, fetch_options, till=None):
263286
"""
@@ -277,13 +300,13 @@ async def _attempt_feature_flag_sync(self, fetch_options, till=None):
277300
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
278301
while True:
279302
remaining_attempts -= 1
280-
change_number, segment_list = await self._fetch_until(fetch_options, till)
303+
change_number, rbs_change_number, segment_list = await self._fetch_until(fetch_options, till)
281304
final_segment_list.update(segment_list)
282-
if till is None or till <= change_number:
283-
return True, remaining_attempts, change_number, final_segment_list
305+
if till is None or (till <= change_number and till <= rbs_change_number):
306+
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list
284307

285308
elif remaining_attempts <= 0:
286-
return False, remaining_attempts, change_number, final_segment_list
309+
return False, remaining_attempts, change_number, rbs_change_number, final_segment_list
287310

288311
how_long = self._backoff.get()
289312
await asyncio.sleep(how_long)
@@ -297,16 +320,16 @@ async def synchronize_splits(self, till=None):
297320
"""
298321
final_segment_list = set()
299322
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
300-
successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
323+
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
301324
till)
302325
final_segment_list.update(segment_list)
303326
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
304327
if successful_sync: # succedeed sync
305328
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
306329
return final_segment_list
307330

308-
with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
309-
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
331+
with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
332+
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
310333
final_segment_list.update(segment_list)
311334
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
312335
if without_cdn_successful_sync:

0 commit comments

Comments
 (0)