diff --git a/cassis/cas.py b/cassis/cas.py index 6ff7925..d64bf66 100644 --- a/cassis/cas.py +++ b/cassis/cas.py @@ -19,10 +19,12 @@ TYPE_NAME_FS_LIST, TYPE_NAME_SOFA, FeatureStructure, + Annotation, Type, TypeCheckError, TypeSystem, TypeSystemMode, + is_annotation, ) _validator_optional_string = validators.optional(validators.instance_of(str)) @@ -171,13 +173,14 @@ def type_index(self) -> Dict[str, SortedKeyList]: return self._indices def add_annotation_to_index(self, annotation: FeatureStructure): + """Adds a feature structure to the type index for this view.""" self._indices[annotation.type.name].add(annotation) def get_all_annotations(self) -> List[FeatureStructure]: - """Gets all the annotations in this view. + """Gets all the FeatureStructure in this view. Returns: - A list of all annotations in this view. + A list of all FeatureStructure in this view. """ result = [] @@ -334,6 +337,8 @@ def add(self, annotation: FeatureStructure, keep_id: Optional[bool] = True): if hasattr(annotation, "sofa"): annotation.sofa = self.get_sofa() + # Add to the index. The view index accepts any FeatureStructure; + # `_sort_func` will duck-type annotation-like objects when sorting. self._current_view.add_annotation_to_index(annotation) @deprecation.deprecated(details="Use add()") @@ -387,7 +392,7 @@ def remove_annotation(self, annotation: FeatureStructure): self.remove(annotation) @deprecation.deprecated(details="Use annotation.get_covered_text()") - def get_covered_text(self, annotation: FeatureStructure) -> str: + def get_covered_text(self, annotation: Annotation) -> str: """Gets the text that is covered by `annotation`. Args: @@ -413,7 +418,7 @@ def select(self, type_: Union[Type, str]) -> List[FeatureStructure]: t = type_ if isinstance(type_, Type) else self.typesystem.get_type(type_) return self._get_feature_structures(t) - def select_covered(self, type_: Union[Type, str], covering_annotation: FeatureStructure) -> List[FeatureStructure]: + def select_covered(self, type_: Union[Type, str], covering_annotation: Annotation) -> List[Annotation]: """Returns a list of covered annotations. Return all annotations that are covered @@ -439,7 +444,7 @@ def select_covered(self, type_: Union[Type, str], covering_annotation: FeatureSt result.append(annotation) return result - def select_covering(self, type_: Union[Type, str], covered_annotation: FeatureStructure) -> List[FeatureStructure]: + def select_covering(self, type_: Union[Type, str], covered_annotation: Annotation) -> List[FeatureStructure]: """Returns a list of annotations that cover the given annotation. Return all annotations that are covering. This can be potentially be slow. @@ -465,7 +470,7 @@ def select_covering(self, type_: Union[Type, str], covered_annotation: FeatureSt if c_begin >= annotation.begin and c_end <= annotation.end: yield annotation - def select_all(self) -> List[FeatureStructure]: + def select_all(self) -> List[Annotation]: """Finds all feature structures in this Cas Returns: @@ -834,8 +839,8 @@ def _copy(self) -> "Cas": def _sort_func(a: FeatureStructure) -> Tuple[int, int, int]: - d = a.__slots__ - if "begin" in d and "end" in d: - return a.begin, a.end, id(a) - else: - return sys.maxsize, sys.maxsize, id(a) + if is_annotation(a): + return a.begin, a.end, a.xmiID if getattr(a, "xmiID", None) is not None else id(a) + + # Non-annotation feature structures are sorted after annotations using large sentinels + return sys.maxsize, sys.maxsize, a.xmiID if getattr(a, "xmiID", None) is not None else id(a) diff --git a/cassis/typesystem.py b/cassis/typesystem.py index 274b819..a2084ad 100644 --- a/cassis/typesystem.py +++ b/cassis/typesystem.py @@ -500,6 +500,23 @@ def __repr__(self): return str(self) +@attr.s(slots=True, hash=False, eq=True, order=True, repr=False) +class Annotation(FeatureStructure): + """Concrete base class for annotation instances. + + Generated types that represent (subtypes of) `uima.tcas.Annotation` will + inherit from this class so that static typing can rely on a nominal base + providing `begin` and `end`. + """ + + begin: int = attr.ib(default=0) + end: int = attr.ib(default=0) + + +def is_annotation(fs: FeatureStructure) -> bool: + return hasattr(fs, "begin") and isinstance(fs.begin, int) and hasattr(fs, "end") and isinstance(fs.end, int) + + @attr.s(slots=True, eq=False, order=False, repr=False) class Feature: """A feature defines one attribute of a feature structure""" @@ -572,15 +589,44 @@ class Type: def __attrs_post_init__(self): """Build the constructor that can create feature structures of this type""" name = _string_to_valid_classname(self.name) - fields = {feature.name: attr.ib(default=None, repr=(feature.name != "sofa")) for feature in self.all_features} + + # Determine whether this type is (transitively) a subtype of uima.tcas.Annotation + def _is_annotation_type(t: "Type") -> bool: + cur = t + while cur is not None: + if cur.name == TYPE_NAME_ANNOTATION: + return True + cur = cur.supertype + return False + + # When inheriting from our concrete Annotation base, do not redeclare + # the 'begin' and 'end' features as fields; they are already present. + fields = {} + for feature in self.all_features: + if feature.name in {"begin", "end"} and _is_annotation_type(self): + # skip - Annotation base provides these + continue + fields[feature.name] = attr.ib(default=None, repr=(feature.name != "sofa")) fields["type"] = attr.ib(default=self) # We assign this to a lambda to make it lazy # When creating large type systems, almost no types are used so # creating them on the fly is on average better - self._constructor_fn = lambda: attr.make_class( - name, fields, bases=(FeatureStructure,), slots=True, eq=False, order=False - ) + bases = (Annotation,) if _is_annotation_type(self) else (FeatureStructure,) + + def _make_fs_class(): + cls = attr.make_class(name, fields, bases=bases, slots=True, eq=False, order=False) + # Ensure generated FS classes are hashable. When a class defines an + # __eq__ (inherited or generated) but no __hash__, Python makes + # instances unhashable. We want FeatureStructure-based instances to + # be usable as dict/set keys (they are keyed by xmiID), so assign the + # base FeatureStructure.__hash__ implementation to the generated + # class if it doesn't already provide one. + if getattr(cls, "__hash__", None) is None: + cls.__hash__ = FeatureStructure.__hash__ + return cls + + self._constructor_fn = _make_fs_class def __call__(self, **kwargs) -> FeatureStructure: """Creates an feature structure of this type diff --git a/cassis/util.py b/cassis/util.py index e199edc..41f389c 100644 --- a/cassis/util.py +++ b/cassis/util.py @@ -1,11 +1,17 @@ import csv from collections import defaultdict -from functools import cmp_to_key from io import IOBase, StringIO -from typing import Dict, Iterable, Set +from typing import Any, Dict, Iterable, Set from cassis import Cas -from cassis.typesystem import FEATURE_BASE_NAME_SOFA, TYPE_NAME_ANNOTATION, FeatureStructure, Type, is_array +from cassis.typesystem import ( + FEATURE_BASE_NAME_SOFA, + TYPE_NAME_ANNOTATION, + FeatureStructure, + Type, + is_annotation, + is_array, +) _EXCLUDED_FEATURES = {FEATURE_BASE_NAME_SOFA} _NULL_VALUE = "" @@ -74,7 +80,7 @@ def _render_feature_structure( ) -> []: row_data = [fs_id_to_anchor.get(fs.xmiID)] - if max_covered_text > 0 and _is_annotation_fs(fs): + if max_covered_text > 0 and is_annotation(fs): covered_text = fs.get_covered_text() if covered_text and len(covered_text) >= max_covered_text: prefix = covered_text[0 : (max_covered_text // 2)] @@ -143,7 +149,19 @@ def _generate_anchors( for t in types_sorted: type_ = cas.typesystem.get_type(t) feature_structures = all_feature_structures_by_type[type_.name] - feature_structures.sort(key=cmp_to_key(lambda a, b: _compare_fs(type_, a, b))) + # Sort deterministically using a stable key function. We avoid using + # the comparator-based approach to prevent unpredictable comparisons + # between mixed types during lexicographic tuple comparisons. + feature_structures.sort( + key=lambda fs: ( + 0, + fs.begin, + fs.end, + str(_feature_structure_hash(type_, fs)), + ) + if is_annotation(fs) + else (1, None, None, str(_feature_structure_hash(type_, fs))) + ) for fs in feature_structures: add_index_mark = mark_indexed and fs in indexed_feature_structures @@ -159,7 +177,7 @@ def _generate_anchors( def _generate_anchor(fs: FeatureStructure, add_index_mark: bool) -> str: anchor = fs.type.name.rsplit(".", 2)[-1] # Get the short type name (no package) - if _is_annotation_fs(fs): + if is_annotation(fs): anchor += f"[{fs.begin}-{fs.end}]" if add_index_mark: @@ -171,7 +189,7 @@ def _generate_anchor(fs: FeatureStructure, add_index_mark: bool) -> str: return anchor -def _is_primitive_value(value: any) -> bool: +def _is_primitive_value(value: Any) -> bool: return type(value) in (int, float, bool, str) @@ -182,65 +200,34 @@ def _is_array_fs(fs: FeatureStructure) -> bool: return is_array(fs.type) -def _is_annotation_fs(fs: FeatureStructure) -> bool: - return hasattr(fs, "begin") and isinstance(fs.begin, int) and hasattr(fs, "end") and isinstance(fs.end, int) - - -def _compare_fs(type_: Type, a: FeatureStructure, b: FeatureStructure) -> int: - if a is b: - return 0 - - # duck-typing check if something is a annotation - if yes, try sorting by offets - fs_a_is_annotation = _is_annotation_fs(a) - fs_b_is_annotation = _is_annotation_fs(b) - if fs_a_is_annotation != fs_b_is_annotation: - return -1 - if fs_a_is_annotation and fs_b_is_annotation: - begin_cmp = a.begin - b.begin - if begin_cmp != 0: - return begin_cmp - - begin_cmp = b.end - a.end - if begin_cmp != 0: - return begin_cmp - - # Alternative implementation - # Doing arithmetics on the hash value as we have done with the offsets does not work because the hashes do not - # provide a global order. Hence, we map all results to 0, -1 and 1 here. - fs_hash_a = _feature_structure_hash(type_, a) - fs_hash_b = _feature_structure_hash(type_, b) - if fs_hash_a == fs_hash_b: - return 0 - return -1 if fs_hash_a < fs_hash_b else 1 - - def _feature_structure_hash(type_: Type, fs: FeatureStructure): - hash_ = 0 + # For backward compatibility keep a function that returns a stable string + # representation of the FS contents. This is used as a deterministic + # tie-breaker when sorting. We avoid returning complex nested tuples to + # keep comparisons simple and stable across original and deserialized CASes. + def _render_val(v): + if v is None: + return "" + if type(v) in (int, float, bool, str): + return str(v) + if _is_array_fs(v): + # Join element representations with '|' + return "[" + ",".join(_render_val(e) for e in (v.elements or [])) + "]" + # Feature structure reference + try: + if is_annotation(v): + return f"{v.type.name}@{v.begin}-{v.end}" + else: + return f"{v.type.name}" + except Exception: + return str(v) + if _is_array_fs(fs): - return len(fs.elements) if fs.elements else 0 + return _render_val(fs.elements or []) - # Should be possible to get away with not sorting here assuming that all_features returns the features always in - # the same order + parts: list[str] = [] for feature in type_.all_features: if feature.name == FEATURE_BASE_NAME_SOFA: continue - - feature_value = getattr(fs, feature.name) - - if _is_array_fs(feature_value): - if feature_value.elements is not None: - for element in feature_value.elements: - hash_ = _feature_value_hash(feature_value, hash_) - else: - hash_ = _feature_value_hash(feature_value, hash_) - return hash_ - - -def _feature_value_hash(feature_value: any, hash_: int): - # Note we do not recurse further into arrays here because that could lead to endless loops! - if type(feature_value) in (int, float, bool, str): - return hash_ + hash(feature_value) - else: - # If we get here, it is a feature structure reference... we cannot really recursively - # go into it to calculate a recursive hash... so we just check if the value is non-null - return hash_ * (-1 if feature_value is None else 1) + parts.append(_render_val(getattr(fs, feature.name))) + return "|".join(parts) diff --git a/cassis/xmi.py b/cassis/xmi.py index 2e1bfe2..1ab4533 100644 --- a/cassis/xmi.py +++ b/cassis/xmi.py @@ -619,13 +619,19 @@ def _serialize_feature_structure(self, cas: Cas, root: etree.Element, fs: Featur continue # Map back from offsets in Unicode codepoints to UIMA UTF-16 based offsets - if ( - ts.is_instance_of(fs.type.name, TYPE_NAME_ANNOTATION) - and feature_name == FEATURE_BASE_NAME_BEGIN - or feature_name == FEATURE_BASE_NAME_END + # Ensure we only convert begin/end for annotation instances. Parentheses are + # required because `and` has higher precedence than `or` and we must not + # attempt conversion for the END feature on non-annotations. + if ts.is_instance_of(fs.type.name, TYPE_NAME_ANNOTATION) and ( + feature_name == FEATURE_BASE_NAME_BEGIN or feature_name == FEATURE_BASE_NAME_END ): - sofa: Sofa = fs.sofa - value = sofa._offset_converter.python_to_external(value) + # Be defensive: only perform offset conversion if the sofa and its + # offset converter have been initialized. In some workflows (e.g. a + # freshly constructed CAS without sofa strings) the converter may + # not exist yet and conversion is not possible. + sofa = getattr(fs, "sofa", None) + if sofa is not None and getattr(sofa, "_offset_converter", None) is not None: + value = sofa._offset_converter.python_to_external(value) if ts.is_instance_of(feature.rangeType, TYPE_NAME_STRING_ARRAY) and not feature.multipleReferencesAllowed: if value.elements is not None: # Compare to none as not to skip if elements is empty! diff --git a/tests/test_cas.py b/tests/test_cas.py index 229bbe2..25bbfe4 100644 --- a/tests/test_cas.py +++ b/tests/test_cas.py @@ -597,6 +597,25 @@ def test_covered_text_on_non_annotation(): top.get_covered_text() +def test_add_non_annotation_and_select(): + """Create a non-annotation type, add an instance and verify select returns it.""" + cas = Cas() + + # Create a type that does not define annotation offsets (begin/end) + NonAnnotation = cas.typesystem.create_type("test.NonAnnotation") + + # Instantiate and add to CAS + fs = NonAnnotation() + cas.add(fs) + + # Should be retrievable by select using the type name + selected = list(cas.select("test.NonAnnotation")) + assert selected == [fs] + + # And visible via select_all + assert fs in cas.select_all() + + def test_covered_text_on_annotation_without_sofa(): cas = Cas() Annotation = cas.typesystem.get_type(TYPE_NAME_ANNOTATION) @@ -604,3 +623,17 @@ def test_covered_text_on_annotation_without_sofa(): with pytest.raises(AnnotationHasNoSofa): ann.get_covered_text() + + +def test_runtime_generated_annotation_is_detected_and_shown_in_anchor(): + ts = TypeSystem() + # Create a new annotation subtype (should inherit from Annotation base) + MyAnno = ts.create_type("my.pkg.MyAnnotation", supertypeName="uima.tcas.Annotation") + + cas = Cas(ts) + # Create an instance of the runtime-generated type; ensure we can set begin/end + a = MyAnno(begin=5, end=10) + cas.add(a) + + text = cas_to_comparable_text(cas) + assert "MyAnnotation[5-10]" in text diff --git a/tests/test_files/xmi/cas_with_collections.xmi b/tests/test_files/xmi/cas_with_collections.xmi index b3846f7..4e47463 100644 --- a/tests/test_files/xmi/cas_with_collections.xmi +++ b/tests/test_files/xmi/cas_with_collections.xmi @@ -20,7 +20,7 @@ C - + A B C @@ -48,9 +48,9 @@ - + - + diff --git a/tests/test_files/xmi/cas_with_multiple_references_allowed_string_array.xmi b/tests/test_files/xmi/cas_with_multiple_references_allowed_string_array.xmi index 41577b0..5a96065 100644 --- a/tests/test_files/xmi/cas_with_multiple_references_allowed_string_array.xmi +++ b/tests/test_files/xmi/cas_with_multiple_references_allowed_string_array.xmi @@ -8,7 +8,7 @@ - + LNC diff --git a/tests/test_files/xmi/cas_with_reserved_names.xmi b/tests/test_files/xmi/cas_with_reserved_names.xmi index a089da0..2634639 100644 --- a/tests/test_files/xmi/cas_with_reserved_names.xmi +++ b/tests/test_files/xmi/cas_with_reserved_names.xmi @@ -3,7 +3,7 @@ xmlns:test="http:///test.ecore" xmi:version="2.0"> - +