99from enum import Enum
1010
1111from splitio .models .splits import from_raw
12+ from splitio .models .rule_based_segments import from_raw as rbs_from_raw
1213from splitio .models .telemetry import UpdateFromSSE
1314from splitio .push import SplitStorageException
1415from splitio .push .parser import UpdateType
1516from splitio .optional .loaders import asyncio
16- from splitio .util .storage_helper import update_feature_flag_storage , update_feature_flag_storage_async
17+ from splitio .util .storage_helper import update_feature_flag_storage , update_feature_flag_storage_async , \
18+ update_rule_based_segment_storage , update_rule_based_segment_storage_async
1719
1820_LOGGER = logging .getLogger (__name__ )
1921
@@ -25,9 +27,9 @@ class CompressionMode(Enum):
2527 ZLIB_COMPRESSION = 2
2628
2729_compression_handlers = {
28- CompressionMode .NO_COMPRESSION : lambda event : base64 .b64decode (event .feature_flag_definition ),
29- CompressionMode .GZIP_COMPRESSION : lambda event : gzip .decompress (base64 .b64decode (event .feature_flag_definition )).decode ('utf-8' ),
30- CompressionMode .ZLIB_COMPRESSION : lambda event : zlib .decompress (base64 .b64decode (event .feature_flag_definition )).decode ('utf-8' ),
30+ CompressionMode .NO_COMPRESSION : lambda event : base64 .b64decode (event .object_definition ),
31+ CompressionMode .GZIP_COMPRESSION : lambda event : gzip .decompress (base64 .b64decode (event .object_definition )).decode ('utf-8' ),
32+ CompressionMode .ZLIB_COMPRESSION : lambda event : zlib .decompress (base64 .b64decode (event .object_definition )).decode ('utf-8' ),
3133}
3234
3335class WorkerBase (object , metaclass = abc .ABCMeta ):
@@ -45,10 +47,19 @@ def start(self):
4547 def stop (self ):
4648 """Stop worker."""
4749
48- def _get_feature_flag_definition (self , event ):
49- """return feature flag definition in event."""
50+ def _get_object_definition (self , event ):
51+ """return feature flag or rule based segment definition in event."""
5052 cm = CompressionMode (event .compression ) # will throw if the number is not defined in compression mode
5153 return _compression_handlers [cm ](event )
54+
55+ def _get_referenced_rbs (self , feature_flag ):
56+ referenced_rbs = set ()
57+ for condition in feature_flag .conditions :
58+ for matcher in condition .matchers :
59+ raw_matcher = matcher .to_json ()
60+ if raw_matcher ['matcherType' ] == 'IN_RULE_BASED_SEGMENT' :
61+ referenced_rbs .add (raw_matcher ['userDefinedSegmentMatcherData' ]['segmentName' ])
62+ return referenced_rbs
5263
5364class SegmentWorker (WorkerBase ):
5465 """Segment Worker for processing updates."""
@@ -173,7 +184,7 @@ class SplitWorker(WorkerBase):
173184
174185 _centinel = object ()
175186
176- def __init__ (self , synchronize_feature_flag , synchronize_segment , feature_flag_queue , feature_flag_storage , segment_storage , telemetry_runtime_producer ):
187+ def __init__ (self , synchronize_feature_flag , synchronize_segment , feature_flag_queue , feature_flag_storage , segment_storage , telemetry_runtime_producer , rule_based_segment_storage ):
177188 """
178189 Class constructor.
179190
@@ -189,6 +200,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q
189200 :type segment_storage: splitio.storage.inmemory.InMemorySegmentStorage
190201 :param telemetry_runtime_producer: Telemetry runtime producer instance
191202 :type telemetry_runtime_producer: splitio.engine.telemetry.TelemetryRuntimeProducer
203+ :param rule_based_segment_storage: Rule based segment Storage.
204+ :type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage
192205 """
193206 self ._feature_flag_queue = feature_flag_queue
194207 self ._handler = synchronize_feature_flag
@@ -198,6 +211,7 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q
198211 self ._feature_flag_storage = feature_flag_storage
199212 self ._segment_storage = segment_storage
200213 self ._telemetry_runtime_producer = telemetry_runtime_producer
214+ self ._rule_based_segment_storage = rule_based_segment_storage
201215
202216 def is_running (self ):
203217 """Return whether the working is running."""
@@ -206,25 +220,40 @@ def is_running(self):
206220 def _apply_iff_if_needed (self , event ):
207221 if not self ._check_instant_ff_update (event ):
208222 return False
209-
210223 try :
211- new_feature_flag = from_raw (json .loads (self ._get_feature_flag_definition (event )))
212- segment_list = update_feature_flag_storage (self ._feature_flag_storage , [new_feature_flag ], event .change_number )
213- for segment_name in segment_list :
214- if self ._segment_storage .get (segment_name ) is None :
215- _LOGGER .debug ('Fetching new segment %s' , segment_name )
216- self ._segment_handler (segment_name , event .change_number )
217-
218- self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .SPLIT_UPDATE )
224+ if event .update_type == UpdateType .SPLIT_UPDATE :
225+ new_feature_flag = from_raw (json .loads (self ._get_object_definition (event )))
226+ segment_list = update_feature_flag_storage (self ._feature_flag_storage , [new_feature_flag ], event .change_number )
227+ for segment_name in segment_list :
228+ if self ._segment_storage .get (segment_name ) is None :
229+ _LOGGER .debug ('Fetching new segment %s' , segment_name )
230+ self ._segment_handler (segment_name , event .change_number )
231+
232+ referenced_rbs = self ._get_referenced_rbs (new_feature_flag )
233+ if len (referenced_rbs ) > 0 and not self ._rule_based_segment_storage .contains (referenced_rbs ):
234+ _LOGGER .debug ('Fetching new rule based segment(s) %s' , referenced_rbs )
235+ self ._handler (None , event .change_number )
236+ self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .SPLIT_UPDATE )
237+ else :
238+ new_rbs = rbs_from_raw (json .loads (self ._get_object_definition (event )))
239+ segment_list = update_rule_based_segment_storage (self ._rule_based_segment_storage , [new_rbs ], event .change_number )
240+ for segment_name in segment_list :
241+ if self ._segment_storage .get (segment_name ) is None :
242+ _LOGGER .debug ('Fetching new segment %s' , segment_name )
243+ self ._segment_handler (segment_name , event .change_number )
244+ self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .RBS_UPDATE )
219245 return True
220-
246+
221247 except Exception as e :
222248 raise SplitStorageException (e )
223249
224250 def _check_instant_ff_update (self , event ):
225251 if event .update_type == UpdateType .SPLIT_UPDATE and event .compression is not None and event .previous_change_number == self ._feature_flag_storage .get_change_number ():
226252 return True
227253
254+ if event .update_type == UpdateType .RB_SEGMENT_UPDATE and event .compression is not None and event .previous_change_number == self ._rule_based_segment_storage .get_change_number ():
255+ return True
256+
228257 return False
229258
230259 def _run (self ):
@@ -239,8 +268,13 @@ def _run(self):
239268 try :
240269 if self ._apply_iff_if_needed (event ):
241270 continue
242-
243- sync_result = self ._handler (event .change_number )
271+ till = None
272+ rbs_till = None
273+ if event .update_type == UpdateType .SPLIT_UPDATE :
274+ till = event .change_number
275+ else :
276+ rbs_till = event .change_number
277+ sync_result = self ._handler (till , rbs_till )
244278 if not sync_result .success and sync_result .error_code is not None and sync_result .error_code == 414 :
245279 _LOGGER .error ("URI too long exception caught, sync failed" )
246280
@@ -279,7 +313,7 @@ class SplitWorkerAsync(WorkerBase):
279313
280314 _centinel = object ()
281315
282- def __init__ (self , synchronize_feature_flag , synchronize_segment , feature_flag_queue , feature_flag_storage , segment_storage , telemetry_runtime_producer ):
316+ def __init__ (self , synchronize_feature_flag , synchronize_segment , feature_flag_queue , feature_flag_storage , segment_storage , telemetry_runtime_producer , rule_based_segment_storage ):
283317 """
284318 Class constructor.
285319
@@ -295,6 +329,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q
295329 :type segment_storage: splitio.storage.inmemory.InMemorySegmentStorage
296330 :param telemetry_runtime_producer: Telemetry runtime producer instance
297331 :type telemetry_runtime_producer: splitio.engine.telemetry.TelemetryRuntimeProducer
332+ :param rule_based_segment_storage: Rule based segment Storage.
333+ :type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage
298334 """
299335 self ._feature_flag_queue = feature_flag_queue
300336 self ._handler = synchronize_feature_flag
@@ -303,7 +339,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q
303339 self ._feature_flag_storage = feature_flag_storage
304340 self ._segment_storage = segment_storage
305341 self ._telemetry_runtime_producer = telemetry_runtime_producer
306-
342+ self ._rule_based_segment_storage = rule_based_segment_storage
343+
307344 def is_running (self ):
308345 """Return whether the working is running."""
309346 return self ._running
@@ -312,23 +349,39 @@ async def _apply_iff_if_needed(self, event):
312349 if not await self ._check_instant_ff_update (event ):
313350 return False
314351 try :
315- new_feature_flag = from_raw (json .loads (self ._get_feature_flag_definition (event )))
316- segment_list = await update_feature_flag_storage_async (self ._feature_flag_storage , [new_feature_flag ], event .change_number )
317- for segment_name in segment_list :
318- if await self ._segment_storage .get (segment_name ) is None :
319- _LOGGER .debug ('Fetching new segment %s' , segment_name )
320- await self ._segment_handler (segment_name , event .change_number )
321-
322- await self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .SPLIT_UPDATE )
352+ if event .update_type == UpdateType .SPLIT_UPDATE :
353+ new_feature_flag = from_raw (json .loads (self ._get_object_definition (event )))
354+ segment_list = await update_feature_flag_storage_async (self ._feature_flag_storage , [new_feature_flag ], event .change_number )
355+ for segment_name in segment_list :
356+ if await self ._segment_storage .get (segment_name ) is None :
357+ _LOGGER .debug ('Fetching new segment %s' , segment_name )
358+ await self ._segment_handler (segment_name , event .change_number )
359+
360+ referenced_rbs = self ._get_referenced_rbs (new_feature_flag )
361+ if len (referenced_rbs ) > 0 and not await self ._rule_based_segment_storage .contains (referenced_rbs ):
362+ await self ._handler (None , event .change_number )
363+
364+ await self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .SPLIT_UPDATE )
365+ else :
366+ new_rbs = rbs_from_raw (json .loads (self ._get_object_definition (event )))
367+ segment_list = await update_rule_based_segment_storage_async (self ._rule_based_segment_storage , [new_rbs ], event .change_number )
368+ for segment_name in segment_list :
369+ if await self ._segment_storage .get (segment_name ) is None :
370+ _LOGGER .debug ('Fetching new segment %s' , segment_name )
371+ await self ._segment_handler (segment_name , event .change_number )
372+ await self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .RBS_UPDATE )
323373 return True
324374
325375 except Exception as e :
326376 raise SplitStorageException (e )
327377
328-
329378 async def _check_instant_ff_update (self , event ):
330379 if event .update_type == UpdateType .SPLIT_UPDATE and event .compression is not None and event .previous_change_number == await self ._feature_flag_storage .get_change_number ():
331380 return True
381+
382+ if event .update_type == UpdateType .RB_SEGMENT_UPDATE and event .compression is not None and event .previous_change_number == await self ._rule_based_segment_storage .get_change_number ():
383+ return True
384+
332385 return False
333386
334387 async def _run (self ):
@@ -343,7 +396,13 @@ async def _run(self):
343396 try :
344397 if await self ._apply_iff_if_needed (event ):
345398 continue
346- await self ._handler (event .change_number )
399+ till = None
400+ rbs_till = None
401+ if event .update_type == UpdateType .SPLIT_UPDATE :
402+ till = event .change_number
403+ else :
404+ rbs_till = event .change_number
405+ await self ._handler (till , rbs_till )
347406 except SplitStorageException as e : # pylint: disable=broad-except
348407 _LOGGER .error ('Exception Updating Feature Flag' )
349408 _LOGGER .debug ('Exception information: ' , exc_info = True )
0 commit comments