Skip to content

Commit f3e9137

Browse files
committed
Update rb segment matcher
1 parent 3eff00c commit f3e9137

File tree

13 files changed

+326
-156
lines changed

13 files changed

+326
-156
lines changed

splitio/engine/evaluator.py

Lines changed: 51 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from splitio.optional.loaders import asyncio
1212

1313
CONTROL = 'control'
14-
EvaluationContext = namedtuple('EvaluationContext', ['flags', 'segment_memberships', 'segment_rbs_memberships', 'segment_rbs_conditions', 'excluded_rbs_segments'])
14+
EvaluationContext = namedtuple('EvaluationContext', ['flags', 'segment_memberships', 'rbs_segments'])
1515

1616
_LOGGER = logging.getLogger(__name__)
1717

@@ -115,59 +115,24 @@ def context_for(self, key, feature_names):
115115
:rtype: EvaluationContext
116116
"""
117117
pending = set(feature_names)
118+
pending_rbs = set()
118119
splits = {}
120+
rb_segments = {}
119121
pending_memberships = set()
120-
pending_rbs_memberships = set()
121-
while pending:
122+
while pending or pending_rbs:
122123
fetched = self._flag_storage.fetch_many(list(pending))
123-
features = filter_missing(fetched)
124-
splits.update(features)
125-
pending = set()
126-
for feature in features.values():
127-
cf, cs, crbs = get_dependencies(feature)
128-
for rbs in crbs:
129-
rbs_cf, rbs_cs, rbs_crbs = get_dependencies(self._rbs_segment_storage.get(rbs))
130-
cf.extend(rbs_cf)
131-
cs.extend(rbs_cs)
132-
crbs.extend(rbs_crbs)
133-
134-
pending.update(filter(lambda f: f not in splits, cf))
135-
pending_memberships.update(cs)
136-
pending_rbs_memberships.update(crbs)
137-
138-
rbs_segment_memberships = {}
139-
rbs_segment_conditions = {}
140-
excluded_rbs_segments = set()
141-
key_membership = False
142-
segment_memberhsip = False
143-
for rbs_segment in pending_rbs_memberships:
144-
rbs_segment_obj = self._rbs_segment_storage.get(rbs_segment)
145-
pending_memberships.update(rbs_segment_obj.get_condition_segment_names())
146-
147-
key_membership = key in rbs_segment_obj.excluded.get_excluded_keys()
148-
segment_memberhsip = False
149-
for excluded_segment in rbs_segment_obj.excluded.get_excluded_segments():
150-
if excluded_segment.type == SegmentType.STANDARD and self._segment_storage.segment_contains(excluded_segment.name, key):
151-
segment_memberhsip = True
152-
153-
if excluded_segment.type == SegmentType.RULE_BASED:
154-
rbs_segment = self._rbs_segment_storage.get(excluded_segment.name)
155-
if rbs_segment is not None:
156-
excluded_rbs_segments.add(rbs_segment)
157-
158-
rbs_segment_memberships.update({rbs_segment: segment_memberhsip or key_membership})
159-
if not (segment_memberhsip or key_membership):
160-
rbs_segment_conditions.update({rbs_segment: [condition for condition in rbs_segment_obj.conditions]})
161-
124+
fetched_rbs = self._rbs_segment_storage.fetch_many(list(pending_rbs))
125+
features, rbsegments, splits, rb_segments = update_objects(fetched, fetched_rbs, splits, rb_segments)
126+
pending, pending_memberships, pending_rbs = get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships)
127+
162128
return EvaluationContext(
163129
splits,
164130
{ segment: self._segment_storage.segment_contains(segment, key)
165131
for segment in pending_memberships
166132
},
167-
rbs_segment_memberships,
168-
rbs_segment_conditions,
169-
excluded_rbs_segments
133+
rb_segments
170134
)
135+
171136

172137
class AsyncEvaluationDataFactory:
173138

@@ -186,72 +151,36 @@ async def context_for(self, key, feature_names):
186151
:rtype: EvaluationContext
187152
"""
188153
pending = set(feature_names)
154+
pending_rbs = set()
189155
splits = {}
156+
rb_segments = {}
190157
pending_memberships = set()
191-
pending_rbs_memberships = set()
192-
while pending:
158+
while pending or pending_rbs:
193159
fetched = await self._flag_storage.fetch_many(list(pending))
194-
features = filter_missing(fetched)
195-
splits.update(features)
196-
pending = set()
197-
for feature in features.values():
198-
cf, cs, crbs = get_dependencies(feature)
199-
for rbs in crbs:
200-
rbs_cf, rbs_cs, rbs_crbs = get_dependencies(await self._rbs_segment_storage.get(rbs))
201-
cf.extend(rbs_cf)
202-
cs.extend(rbs_cs)
203-
crbs.extend(rbs_crbs)
204-
205-
pending.update(filter(lambda f: f not in splits, cf))
206-
pending_memberships.update(cs)
207-
pending_rbs_memberships.update(crbs)
208-
209-
rbs_segment_memberships = {}
210-
rbs_segment_conditions = {}
211-
excluded_rbs_segments = set()
212-
key_membership = False
213-
segment_memberhsip = False
214-
for rbs_segment in pending_rbs_memberships:
215-
rbs_segment_obj = await self._rbs_segment_storage.get(rbs_segment)
216-
pending_memberships.update(rbs_segment_obj.get_condition_segment_names())
217-
218-
key_membership = key in rbs_segment_obj.excluded.get_excluded_keys()
219-
segment_memberhsip = False
220-
for excluded_segment in rbs_segment_obj.excluded.get_excluded_segments():
221-
if excluded_segment.type == SegmentType.STANDARD and await self._segment_storage.segment_contains(excluded_segment.name, key):
222-
segment_memberhsip = True
223-
224-
if excluded_segment.type == SegmentType.RULE_BASED:
225-
rbs_segment = await self._rbs_segment_storage.get(excluded_segment.name)
226-
if rbs_segment is not None:
227-
excluded_rbs_segments.add(rbs_segment)
228-
229-
rbs_segment_memberships.update({rbs_segment: segment_memberhsip or key_membership})
230-
if not (segment_memberhsip or key_membership):
231-
rbs_segment_conditions.update({rbs_segment: [condition for condition in rbs_segment_obj.conditions]})
160+
fetched_rbs = await self._rbs_segment_storage.fetch_many(list(pending_rbs))
161+
features, rbsegments, splits, rb_segments = update_objects(fetched, fetched_rbs, splits, rb_segments)
162+
pending, pending_memberships, pending_rbs = get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships)
232163

233164
segment_names = list(pending_memberships)
234165
segment_memberships = await asyncio.gather(*[
235166
self._segment_storage.segment_contains(segment, key)
236167
for segment in segment_names
237168
])
169+
238170
return EvaluationContext(
239171
splits,
240172
dict(zip(segment_names, segment_memberships)),
241-
rbs_segment_memberships,
242-
rbs_segment_conditions,
243-
excluded_rbs_segments
173+
rb_segments
244174
)
245175

246-
247-
def get_dependencies(feature):
176+
def get_dependencies(object):
248177
"""
249178
:rtype: tuple(list, list)
250179
"""
251180
feature_names = []
252181
segment_names = []
253182
rbs_segment_names = []
254-
for condition in feature.conditions:
183+
for condition in object.conditions:
255184
for matcher in condition.matchers:
256185
if isinstance(matcher,RuleBasedSegmentMatcher):
257186
rbs_segment_names.append(matcher._rbs_segment_name)
@@ -264,3 +193,34 @@ def get_dependencies(feature):
264193

265194
def filter_missing(features):
266195
return {k: v for (k, v) in features.items() if v is not None}
196+
197+
def get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships):
198+
pending = set()
199+
pending_rbs = set()
200+
for feature in features.values():
201+
cf, cs, crbs = get_dependencies(feature)
202+
pending.update(filter(lambda f: f not in splits, cf))
203+
pending_memberships.update(cs)
204+
pending_rbs.update(filter(lambda f: f not in rb_segments, crbs))
205+
206+
for rb_segment in rbsegments.values():
207+
cf, cs, crbs = get_dependencies(rb_segment)
208+
pending.update(filter(lambda f: f not in splits, cf))
209+
pending_memberships.update(cs)
210+
for excluded_segment in rb_segment.excluded.get_excluded_segments():
211+
if excluded_segment.type == SegmentType.STANDARD:
212+
pending_memberships.add(excluded_segment.name)
213+
else:
214+
pending_rbs.update(filter(lambda f: f not in rb_segments, [excluded_segment.name]))
215+
pending_rbs.update(filter(lambda f: f not in rb_segments, crbs))
216+
217+
return pending, pending_memberships, pending_rbs
218+
219+
def update_objects(fetched, fetched_rbs, splits, rb_segments):
220+
features = filter_missing(fetched)
221+
rbsegments = filter_missing(fetched_rbs)
222+
splits.update(features)
223+
rb_segments.update(rbsegments)
224+
225+
return features, rbsegments, splits, rb_segments
226+

splitio/models/grammar/matchers/rule_based_segment.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Rule based segment matcher classes."""
22
from splitio.models.grammar.matchers.base import Matcher
3+
from splitio.models.rule_based_segments import SegmentType
34

45
class RuleBasedSegmentMatcher(Matcher):
56

@@ -29,15 +30,15 @@ def _match(self, key, attributes=None, context=None):
2930
if self._rbs_segment_name == None:
3031
return False
3132

32-
# Check if rbs segment has exclusions
33-
if context['ec'].segment_rbs_memberships.get(self._rbs_segment_name):
34-
return False
35-
36-
for rbs_segment in context['ec'].excluded_rbs_segments:
37-
if self._match_conditions(rbs_segment.conditions, key, attributes, context):
38-
return True
33+
rb_segment = context['ec'].rbs_segments.get(self._rbs_segment_name)
3934

40-
return self._match_conditions(context['ec'].segment_rbs_conditions.get(self._rbs_segment_name), key, attributes, context):
35+
if key in rb_segment.excluded.get_excluded_keys():
36+
return False
37+
38+
if self._match_dep_rb_segments(rb_segment.excluded.get_excluded_segments(), key, attributes, context):
39+
return False
40+
41+
return self._match_conditions(rb_segment.conditions, key, attributes, context)
4142

4243
def _add_matcher_specific_properties_to_json(self):
4344
"""Return UserDefinedSegment specific properties."""
@@ -51,3 +52,23 @@ def _match_conditions(self, rbs_segment_conditions, key, attributes, context):
5152
for parsed_condition in rbs_segment_conditions:
5253
if parsed_condition.matches(key, attributes, context):
5354
return True
55+
56+
return False
57+
58+
def _match_dep_rb_segments(self, excluded_rb_segments, key, attributes, context):
59+
for excluded_rb_segment in excluded_rb_segments:
60+
if excluded_rb_segment.type == SegmentType.STANDARD:
61+
if context['ec'].segment_memberships[excluded_rb_segment.name]:
62+
return True
63+
else:
64+
excluded_segment = context['ec'].rbs_segments.get(excluded_rb_segment.name)
65+
if key in excluded_segment.excluded.get_excluded_keys():
66+
return True
67+
68+
if self._match_dep_rb_segments(excluded_segment.excluded.get_excluded_segments(), key, attributes, context):
69+
return True
70+
71+
if self._match_conditions(excluded_segment.conditions, key, attributes, context):
72+
return True
73+
74+
return False

splitio/storage/inmemmory.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,9 @@ def contains(self, segment_names):
230230
"""
231231
with self._lock:
232232
return set(segment_names).issubset(self._rule_based_segments.keys())
233+
234+
def fetch_many(self, segment_names):
235+
return {rb_segment_name: self.get(rb_segment_name) for rb_segment_name in segment_names}
233236

234237
class InMemoryRuleBasedSegmentStorageAsync(RuleBasedSegmentsStorage):
235238
"""InMemory implementation of a feature flag storage base."""
@@ -354,6 +357,9 @@ async def contains(self, segment_names):
354357
async with self._lock:
355358
return set(segment_names).issubset(self._rule_based_segments.keys())
356359

360+
async def fetch_many(self, segment_names):
361+
return {rb_segment_name: await self.get(rb_segment_name) for rb_segment_name in segment_names}
362+
357363
class InMemorySplitStorageBase(SplitStorage):
358364
"""InMemory implementation of a feature flag storage base."""
359365

splitio/storage/pluggable.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,25 @@ def get_segment_names(self):
177177
_LOGGER.error('Error getting rule based segments names from storage')
178178
_LOGGER.debug('Error: ', exc_info=True)
179179
return None
180+
181+
def fetch_many(self, rb_segment_names):
182+
"""
183+
Retrieve rule based segments.
184+
185+
:param rb_segment_names: Names of the rule based segments to fetch.
186+
:type rb_segment_names: list(str)
187+
188+
:return: A dict with rule based segment objects parsed from queue.
189+
:rtype: dict(rb_segment_names, splitio.models.rile_based_segment.RuleBasedSegment)
190+
"""
191+
try:
192+
prefix_added = [self._prefix.format(segment_name=rb_segment_name) for rb_segment_name in rb_segment_names]
193+
return {rb_segment['name']: rule_based_segments.from_raw(rb_segment) for rb_segment in self._pluggable_adapter.get_many(prefix_added)}
194+
195+
except Exception:
196+
_LOGGER.error('Error getting rule based segments from storage')
197+
_LOGGER.debug('Error: ', exc_info=True)
198+
return None
180199

181200
class PluggableRuleBasedSegmentsStorageAsync(PluggableRuleBasedSegmentsStorageBase):
182201
"""Pluggable storage for rule based segments."""
@@ -256,6 +275,25 @@ async def get_segment_names(self):
256275
_LOGGER.debug('Error: ', exc_info=True)
257276
return None
258277

278+
async def fetch_many(self, rb_segment_names):
279+
"""
280+
Retrieve rule based segments.
281+
282+
:param rb_segment_names: Names of the rule based segments to fetch.
283+
:type rb_segment_names: list(str)
284+
285+
:return: A dict with rule based segment objects parsed from queue.
286+
:rtype: dict(rb_segment_names, splitio.models.rile_based_segment.RuleBasedSegment)
287+
"""
288+
try:
289+
prefix_added = [self._prefix.format(segment_name=rb_segment_name) for rb_segment_name in rb_segment_names]
290+
return {rb_segment['name']: rule_based_segments.from_raw(rb_segment) for rb_segment in await self._pluggable_adapter.get_many(prefix_added)}
291+
292+
except Exception:
293+
_LOGGER.error('Error getting rule based segments from storage')
294+
_LOGGER.debug('Error: ', exc_info=True)
295+
return None
296+
259297
class PluggableSplitStorageBase(SplitStorage):
260298
"""InMemory implementation of a feature flag storage."""
261299

splitio/storage/redis.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,35 @@ def get_large_segment_names(self):
131131
"""
132132
pass
133133

134+
def fetch_many(self, segment_names):
135+
"""
136+
Retrieve rule based segment.
137+
138+
:param segment_names: Names of the rule based segments to fetch.
139+
:type segment_names: list(str)
140+
141+
:return: A dict with rule based segment objects parsed from redis.
142+
:rtype: dict(segment_name, splitio.models.rule_based_segment.RuleBasedSegment)
143+
"""
144+
to_return = dict()
145+
try:
146+
keys = [self._get_key(segment_name) for segment_name in segment_names]
147+
raw_rbs_segments = self._redis.mget(keys)
148+
_LOGGER.debug("Fetchting rule based segment [%s] from redis" % segment_names)
149+
_LOGGER.debug(raw_rbs_segments)
150+
for i in range(len(raw_rbs_segments)):
151+
rbs_segment = None
152+
try:
153+
rbs_segment = rule_based_segments.from_raw(json.loads(raw_rbs_segments[i]))
154+
except (ValueError, TypeError):
155+
_LOGGER.error('Could not parse rule based segment.')
156+
_LOGGER.debug("Raw rule based segment that failed parsing attempt: %s", raw_rbs_segments[i])
157+
to_return[segment_names[i]] = rbs_segment
158+
except RedisAdapterException:
159+
_LOGGER.error('Error fetching rule based segments from storage')
160+
_LOGGER.debug('Error: ', exc_info=True)
161+
return to_return
162+
134163
class RedisRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage):
135164
"""Redis-based storage for rule based segments."""
136165

@@ -246,6 +275,35 @@ async def get_large_segment_names(self):
246275
"""
247276
pass
248277

278+
async def fetch_many(self, segment_names):
279+
"""
280+
Retrieve rule based segment.
281+
282+
:param segment_names: Names of the rule based segments to fetch.
283+
:type segment_names: list(str)
284+
285+
:return: A dict with rule based segment objects parsed from redis.
286+
:rtype: dict(segment_name, splitio.models.rule_based_segment.RuleBasedSegment)
287+
"""
288+
to_return = dict()
289+
try:
290+
keys = [self._get_key(segment_name) for segment_name in segment_names]
291+
raw_rbs_segments = await self._redis.mget(keys)
292+
_LOGGER.debug("Fetchting rule based segment [%s] from redis" % segment_names)
293+
_LOGGER.debug(raw_rbs_segments)
294+
for i in range(len(raw_rbs_segments)):
295+
rbs_segment = None
296+
try:
297+
rbs_segment = rule_based_segments.from_raw(json.loads(raw_rbs_segments[i]))
298+
except (ValueError, TypeError):
299+
_LOGGER.error('Could not parse rule based segment.')
300+
_LOGGER.debug("Raw rule based segment that failed parsing attempt: %s", raw_rbs_segments[i])
301+
to_return[segment_names[i]] = rbs_segment
302+
except RedisAdapterException:
303+
_LOGGER.error('Error fetching rule based segments from storage')
304+
_LOGGER.debug('Error: ', exc_info=True)
305+
return to_return
306+
249307
class RedisSplitStorageBase(SplitStorage):
250308
"""Redis-based storage base for feature flags."""
251309

0 commit comments

Comments
 (0)