|
| 1 | +from collections import Counter, defaultdict |
| 2 | +import logging |
| 3 | +import re |
| 4 | +import typing |
| 5 | + |
| 6 | + |
| 7 | +logger = logging.getLogger(__name__) |
| 8 | +RECOGNIZED_FEATURES = ('encrypted', 'replicated') |
| 9 | +ERRONEOUS = object() # use this as key in volume_type_lookup for erroneous volume types |
| 10 | + |
| 11 | + |
| 12 | +def _extract_feature_list(description, pattern=re.compile(r"\[scs:([^\[\]]*)\]")): |
| 13 | + """Extract feature-list-like prefix |
| 14 | +
|
| 15 | + If given `description` starts with a feature-list-like prefix, return list of features, |
| 16 | + otherwise None. To be more precise, we look for a string of this form: |
| 17 | +
|
| 18 | + `[scs:`feat1`, `...`, `...featN`]` |
| 19 | +
|
| 20 | + where N >= 1 and featJ is a string that doesn't contain any comma or brackets. We return |
| 21 | + the list [feat1, ..., featN] of substrings. |
| 22 | + """ |
| 23 | + if not description: |
| 24 | + # The description can be None or empty - we need to catch this here |
| 25 | + return |
| 26 | + match = pattern.match(description) |
| 27 | + if not match: |
| 28 | + return |
| 29 | + fs = match.group(1) |
| 30 | + if not fs: |
| 31 | + return [] |
| 32 | + return [f.strip() for f in fs.split(",")] |
| 33 | + |
| 34 | + |
| 35 | +def _test_feature_list(type_name: str, fl: typing.List[str], recognized=RECOGNIZED_FEATURES) -> typing.List[str]: |
| 36 | + """Test given list of features and return list of errors""" |
| 37 | + if not fl: |
| 38 | + # either None (no feature list) or empty feature list: nothing to check |
| 39 | + return |
| 40 | + errors = [] |
| 41 | + if fl != sorted(fl): |
| 42 | + errors.append(f"{type_name}: feature list not sorted") |
| 43 | + ctr = Counter(fl) |
| 44 | + duplicates = [key for key, c in ctr.items() if c > 1] |
| 45 | + if duplicates: |
| 46 | + errors.append(f"{type_name}: duplicate features: {', '.join(duplicates)}") |
| 47 | + unrecognized = [f for f in ctr if f not in recognized] |
| 48 | + if unrecognized: |
| 49 | + errors.append(f"{type_name}: unrecognized features: {', '.join(unrecognized)}") |
| 50 | + return errors |
| 51 | + |
| 52 | + |
| 53 | +def compute_volume_type_lookup(volume_types): |
| 54 | + # collect volume types according to features |
| 55 | + by_feature = defaultdict(list) |
| 56 | + for typ in volume_types: |
| 57 | + fl = _extract_feature_list(typ.description) |
| 58 | + if fl is None: |
| 59 | + continue |
| 60 | + logger.debug(f"{typ.name}: feature list {fl!r}") |
| 61 | + errors = _test_feature_list(typ.name, fl) |
| 62 | + if errors: |
| 63 | + by_feature[ERRONEOUS].extend(errors) |
| 64 | + for feat in fl: |
| 65 | + by_feature[feat].append(typ.name) |
| 66 | + return by_feature |
| 67 | + |
| 68 | + |
| 69 | +def compute_scs_0114_syntax_check(volume_type_lookup): |
| 70 | + errors = volume_type_lookup.get(ERRONEOUS, ()) |
| 71 | + for line in errors: |
| 72 | + logger.error(line) |
| 73 | + return not errors |
| 74 | + |
| 75 | + |
| 76 | +def compute_scs_0114_aspect_type(volume_type_lookup, aspect): |
| 77 | + applicable = volume_type_lookup[aspect] |
| 78 | + if not applicable: |
| 79 | + logger.error(f"no volume type having aspect {aspect}") |
| 80 | + return bool(applicable) |
0 commit comments