Skip to content

Commit 6611a43

Browse files
committed
Update sync and tests
1 parent 58d5ddd commit 6611a43

File tree

3 files changed

+44
-28
lines changed

3 files changed

+44
-28
lines changed

setup.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
'flake8',
99
'pytest==7.0.1',
1010
'pytest-mock==3.11.1',
11-
'coverage',
11+
'coverage==7.0.0',
1212
'pytest-cov==4.1.0',
1313
'importlib-metadata==6.7',
1414
'tomli==1.2.3',
@@ -17,7 +17,8 @@
1717
'pytest-asyncio==0.21.0',
1818
'aiohttp>=3.8.4',
1919
'aiofiles>=23.1.0',
20-
'requests-kerberos>=0.15.0'
20+
'requests-kerberos>=0.15.0',
21+
'urllib3==2.2.0'
2122
]
2223

2324
INSTALL_REQUIRES = [

splitio/sync/split.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_st
8686
"""
8787
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)
8888

89-
def _fetch_until(self, fetch_options, till=None):
89+
def _fetch_until(self, fetch_options, till=None, rbs_till=None):
9090
"""
9191
Hit endpoint, update storage and return when since==till.
9292
@@ -109,7 +109,7 @@ def _fetch_until(self, fetch_options, till=None):
109109
if rbs_change_number is None:
110110
rbs_change_number = -1
111111

112-
if till is not None and till < change_number and till < rbs_change_number:
112+
if (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number):
113113
# the passed till is less than change_number, no need to perform updates
114114
return change_number, rbs_change_number, segment_list
115115

@@ -135,7 +135,7 @@ def _fetch_until(self, fetch_options, till=None):
135135
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
136136
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list
137137

138-
def _attempt_feature_flag_sync(self, fetch_options, till=None):
138+
def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None):
139139
"""
140140
Hit endpoint, update storage and return True if sync is complete.
141141
@@ -153,9 +153,9 @@ def _attempt_feature_flag_sync(self, fetch_options, till=None):
153153
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
154154
while True:
155155
remaining_attempts -= 1
156-
change_number, rbs_change_number, segment_list = self._fetch_until(fetch_options, till)
156+
change_number, rbs_change_number, segment_list = self._fetch_until(fetch_options, till, rbs_till)
157157
final_segment_list.update(segment_list)
158-
if till is None or (till <= change_number and till <= rbs_change_number):
158+
if (till is None or till <= change_number) and (rbs_till is None or rbs_till <= rbs_change_number):
159159
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list
160160

161161
elif remaining_attempts <= 0:
@@ -176,7 +176,7 @@ def _get_config_sets(self):
176176

177177
return ','.join(self._feature_flag_storage.flag_set_filter.sorted_flag_sets)
178178

179-
def synchronize_splits(self, till=None):
179+
def synchronize_splits(self, till=None, rbs_till=None):
180180
"""
181181
Hit endpoint, update storage and return True if sync is complete.
182182
@@ -186,15 +186,15 @@ def synchronize_splits(self, till=None):
186186
final_segment_list = set()
187187
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
188188
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
189-
till)
189+
till, rbs_till)
190190
final_segment_list.update(segment_list)
191191
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
192192
if successful_sync: # succedeed sync
193193
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
194194
return final_segment_list
195195

196196
with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
197-
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
197+
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till, rbs_till)
198198
final_segment_list.update(segment_list)
199199
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
200200
if without_cdn_successful_sync:
@@ -233,7 +233,7 @@ def __init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_st
233233
"""
234234
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage, rule_based_segment_storage)
235235

236-
async def _fetch_until(self, fetch_options, till=None):
236+
async def _fetch_until(self, fetch_options, till=None, rbs_till=None):
237237
"""
238238
Hit endpoint, update storage and return when since==till.
239239
@@ -256,7 +256,7 @@ async def _fetch_until(self, fetch_options, till=None):
256256
if rbs_change_number is None:
257257
rbs_change_number = -1
258258

259-
if till is not None and till < change_number and till < rbs_change_number:
259+
if (till is not None and till < change_number) or (rbs_till is not None and till < rbs_change_number):
260260
# the passed till is less than change_number, no need to perform updates
261261
return change_number, rbs_change_number, segment_list
262262

@@ -282,7 +282,7 @@ async def _fetch_until(self, fetch_options, till=None):
282282
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
283283
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list
284284

285-
async def _attempt_feature_flag_sync(self, fetch_options, till=None):
285+
async def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None):
286286
"""
287287
Hit endpoint, update storage and return True if sync is complete.
288288
@@ -300,9 +300,9 @@ async def _attempt_feature_flag_sync(self, fetch_options, till=None):
300300
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
301301
while True:
302302
remaining_attempts -= 1
303-
change_number, rbs_change_number, segment_list = await self._fetch_until(fetch_options, till)
303+
change_number, rbs_change_number, segment_list = await self._fetch_until(fetch_options, till, rbs_till)
304304
final_segment_list.update(segment_list)
305-
if till is None or (till <= change_number and till <= rbs_change_number):
305+
if (till is None or till <= change_number) and (rbs_till is None or rbs_till <= rbs_change_number):
306306
return True, remaining_attempts, change_number, rbs_change_number, final_segment_list
307307

308308
elif remaining_attempts <= 0:
@@ -311,7 +311,7 @@ async def _attempt_feature_flag_sync(self, fetch_options, till=None):
311311
how_long = self._backoff.get()
312312
await asyncio.sleep(how_long)
313313

314-
async def synchronize_splits(self, till=None):
314+
async def synchronize_splits(self, till=None, rbs_till=None):
315315
"""
316316
Hit endpoint, update storage and return True if sync is complete.
317317
@@ -321,15 +321,15 @@ async def synchronize_splits(self, till=None):
321321
final_segment_list = set()
322322
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
323323
successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
324-
till)
324+
till, rbs_till)
325325
final_segment_list.update(segment_list)
326326
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
327327
if successful_sync: # succedeed sync
328328
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
329329
return final_segment_list
330330

331331
with_cdn_bypass = FetchOptions(True, change_number, rbs_change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
332-
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
332+
without_cdn_successful_sync, remaining_attempts, change_number, rbs_change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till, rbs_till)
333333
final_segment_list.update(segment_list)
334334
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
335335
if without_cdn_successful_sync:

tests/sync/test_splits_synchronizer.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -322,34 +322,44 @@ def rbs_change_number_mock():
322322
rbs_change_number_mock._calls += 1
323323
if rbs_change_number_mock._calls == 1:
324324
return -1
325-
return 12345 # Return proper cn for CDN Bypass
325+
elif change_number_mock._calls >= 2 and change_number_mock._calls <= 3:
326+
return 555
327+
elif change_number_mock._calls <= 9:
328+
return 555
329+
return 666 # Return proper cn for CDN Bypass
326330

327331
change_number_mock._calls = 0
328332
rbs_change_number_mock._calls = 0
329333
storage.get_change_number.side_effect = change_number_mock
330334
rbs_storage.get_change_number.side_effect = rbs_change_number_mock
331335

332336
api = mocker.Mock()
333-
337+
rbs_1 = copy.deepcopy(json_body['rbs']['d'])
334338
def get_changes(*args, **kwargs):
335339
get_changes.called += 1
340+
# pytest.set_trace()
336341
if get_changes.called == 1:
337342
return { 'ff': { 'd': self.splits, 's': -1, 't': 123 },
338-
'rbs': {"t": 123, "s": -1, "d": []}}
343+
'rbs': {"t": 555, "s": -1, "d": rbs_1}}
339344
elif get_changes.called == 2:
340345
return { 'ff': { 'd': [], 's': 123, 't': 123 },
341-
'rbs': {"t": 123, "s": 123, "d": []}}
346+
'rbs': {"t": 555, "s": 555, "d": []}}
342347
elif get_changes.called == 3:
343348
return { 'ff': { 'd': [], 's': 123, 't': 1234 },
344-
'rbs': {"t": 123, "s": 123, "d": []}}
349+
'rbs': {"t": 555, "s": 555, "d": []}}
345350
elif get_changes.called >= 4 and get_changes.called <= 6:
346351
return { 'ff': { 'd': [], 's': 1234, 't': 1234 },
347-
'rbs': {"t": 123, "s": 123, "d": []}}
352+
'rbs': {"t": 555, "s": 555, "d": []}}
348353
elif get_changes.called == 7:
349354
return { 'ff': { 'd': [], 's': 1234, 't': 12345 },
350-
'rbs': {"t": 123, "s": 123, "d": []}}
355+
'rbs': {"t": 555, "s": 555, "d": []}}
356+
elif get_changes.called == 8:
357+
return { 'ff': { 'd': [], 's': 12345, 't': 12345 },
358+
'rbs': {"t": 555, "s": 555, "d": []}}
359+
rbs_1[0]['excluded']['keys'] = ['[email protected]']
351360
return { 'ff': { 'd': [], 's': 12345, 't': 12345 },
352-
'rbs': {"t": 123, "s": 123, "d": []}}
361+
'rbs': {"t": 666, "s": 666, "d": rbs_1}}
362+
353363
get_changes.called = 0
354364
api.fetch_splits.side_effect = get_changes
355365

@@ -377,12 +387,17 @@ def intersect(sets):
377387
split_synchronizer.synchronize_splits(12345)
378388
assert api.fetch_splits.mock_calls[3][1][0] == 1234
379389
assert api.fetch_splits.mock_calls[3][1][2].cache_control_headers == True
380-
assert len(api.fetch_splits.mock_calls) == 10 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)
390+
assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)
381391

382392
inserted_split = storage.update.mock_calls[0][1][0][0]
383393
assert isinstance(inserted_split, Split)
384394
assert inserted_split.name == 'some_name'
385395

396+
split_synchronizer._backoff = Backoff(1, 0.1)
397+
split_synchronizer.synchronize_splits(None, 666)
398+
inserted_rbs = rbs_storage.update.mock_calls[8][1][0][0]
399+
assert inserted_rbs.excluded.get_excluded_keys() == ['[email protected]']
400+
386401
def test_sync_flag_sets_with_config_sets(self, mocker):
387402
"""Test split sync with flag sets."""
388403
storage = InMemorySplitStorage(['set1', 'set2'])
@@ -723,7 +738,7 @@ def intersect(sets):
723738
split_synchronizer._backoff = Backoff(1, 0.1)
724739
await split_synchronizer.synchronize_splits(12345)
725740
assert (12345, True, 1234) == (self.change_number_3, self.fetch_options_3.cache_control_headers, self.fetch_options_3.change_number)
726-
assert get_changes.called == 10 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)
741+
assert get_changes.called == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)
727742

728743
inserted_split = self.parsed_split[0]
729744
assert isinstance(inserted_split, Split)

0 commit comments

Comments
 (0)