Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions cassis/cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
TYPE_NAME_FS_LIST,
TYPE_NAME_SOFA,
FeatureStructure,
Annotation,
Type,
TypeCheckError,
TypeSystem,
TypeSystemMode,
is_annotation,
)

_validator_optional_string = validators.optional(validators.instance_of(str))
Expand Down Expand Up @@ -171,13 +173,14 @@ def type_index(self) -> Dict[str, SortedKeyList]:
return self._indices

def add_annotation_to_index(self, annotation: FeatureStructure):
"""Adds a feature structure to the type index for this view."""
self._indices[annotation.type.name].add(annotation)

def get_all_annotations(self) -> List[FeatureStructure]:
"""Gets all the annotations in this view.
"""Gets all the FeatureStructure in this view.

Returns:
A list of all annotations in this view.
A list of all FeatureStructure in this view.

"""
result = []
Expand Down Expand Up @@ -334,6 +337,8 @@ def add(self, annotation: FeatureStructure, keep_id: Optional[bool] = True):
if hasattr(annotation, "sofa"):
annotation.sofa = self.get_sofa()

# Add to the index. The view index accepts any FeatureStructure;
# `_sort_func` will duck-type annotation-like objects when sorting.
self._current_view.add_annotation_to_index(annotation)

@deprecation.deprecated(details="Use add()")
Expand Down Expand Up @@ -387,7 +392,7 @@ def remove_annotation(self, annotation: FeatureStructure):
self.remove(annotation)

@deprecation.deprecated(details="Use annotation.get_covered_text()")
def get_covered_text(self, annotation: FeatureStructure) -> str:
def get_covered_text(self, annotation: Annotation) -> str:
"""Gets the text that is covered by `annotation`.

Args:
Expand All @@ -413,7 +418,7 @@ def select(self, type_: Union[Type, str]) -> List[FeatureStructure]:
t = type_ if isinstance(type_, Type) else self.typesystem.get_type(type_)
return self._get_feature_structures(t)

def select_covered(self, type_: Union[Type, str], covering_annotation: FeatureStructure) -> List[FeatureStructure]:
def select_covered(self, type_: Union[Type, str], covering_annotation: Annotation) -> List[Annotation]:
"""Returns a list of covered annotations.

Return all annotations that are covered
Expand All @@ -439,7 +444,7 @@ def select_covered(self, type_: Union[Type, str], covering_annotation: FeatureSt
result.append(annotation)
return result

def select_covering(self, type_: Union[Type, str], covered_annotation: FeatureStructure) -> List[FeatureStructure]:
def select_covering(self, type_: Union[Type, str], covered_annotation: Annotation) -> List[FeatureStructure]:
"""Returns a list of annotations that cover the given annotation.

Return all annotations that are covering. This can be potentially be slow.
Expand All @@ -465,7 +470,7 @@ def select_covering(self, type_: Union[Type, str], covered_annotation: FeatureSt
if c_begin >= annotation.begin and c_end <= annotation.end:
yield annotation

def select_all(self) -> List[FeatureStructure]:
def select_all(self) -> List[Annotation]:
"""Finds all feature structures in this Cas

Returns:
Expand Down Expand Up @@ -834,8 +839,8 @@ def _copy(self) -> "Cas":


def _sort_func(a: FeatureStructure) -> Tuple[int, int, int]:
d = a.__slots__
if "begin" in d and "end" in d:
return a.begin, a.end, id(a)
else:
return sys.maxsize, sys.maxsize, id(a)
if is_annotation(a):
return a.begin, a.end, a.xmiID if getattr(a, "xmiID", None) is not None else id(a)

# Non-annotation feature structures are sorted after annotations using large sentinels
return sys.maxsize, sys.maxsize, a.xmiID if getattr(a, "xmiID", None) is not None else id(a)
54 changes: 50 additions & 4 deletions cassis/typesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,23 @@ def __repr__(self):
return str(self)


@attr.s(slots=True, hash=False, eq=True, order=True, repr=False)
class Annotation(FeatureStructure):
"""Concrete base class for annotation instances.

Generated types that represent (subtypes of) `uima.tcas.Annotation` will
inherit from this class so that static typing can rely on a nominal base
providing `begin` and `end`.
"""

begin: int = attr.ib(default=0)
end: int = attr.ib(default=0)


def is_annotation(fs: FeatureStructure) -> bool:
return hasattr(fs, "begin") and isinstance(fs.begin, int) and hasattr(fs, "end") and isinstance(fs.end, int)


@attr.s(slots=True, eq=False, order=False, repr=False)
class Feature:
"""A feature defines one attribute of a feature structure"""
Expand Down Expand Up @@ -572,15 +589,44 @@ class Type:
def __attrs_post_init__(self):
"""Build the constructor that can create feature structures of this type"""
name = _string_to_valid_classname(self.name)
fields = {feature.name: attr.ib(default=None, repr=(feature.name != "sofa")) for feature in self.all_features}

# Determine whether this type is (transitively) a subtype of uima.tcas.Annotation
def _is_annotation_type(t: "Type") -> bool:
cur = t
while cur is not None:
if cur.name == TYPE_NAME_ANNOTATION:
return True
cur = cur.supertype
return False

# When inheriting from our concrete Annotation base, do not redeclare
# the 'begin' and 'end' features as fields; they are already present.
fields = {}
for feature in self.all_features:
if feature.name in {"begin", "end"} and _is_annotation_type(self):
# skip - Annotation base provides these
continue
fields[feature.name] = attr.ib(default=None, repr=(feature.name != "sofa"))
fields["type"] = attr.ib(default=self)

# We assign this to a lambda to make it lazy
# When creating large type systems, almost no types are used so
# creating them on the fly is on average better
self._constructor_fn = lambda: attr.make_class(
name, fields, bases=(FeatureStructure,), slots=True, eq=False, order=False
)
bases = (Annotation,) if _is_annotation_type(self) else (FeatureStructure,)

def _make_fs_class():
cls = attr.make_class(name, fields, bases=bases, slots=True, eq=False, order=False)
# Ensure generated FS classes are hashable. When a class defines an
# __eq__ (inherited or generated) but no __hash__, Python makes
# instances unhashable. We want FeatureStructure-based instances to
# be usable as dict/set keys (they are keyed by xmiID), so assign the
# base FeatureStructure.__hash__ implementation to the generated
# class if it doesn't already provide one.
if getattr(cls, "__hash__", None) is None:
cls.__hash__ = FeatureStructure.__hash__
return cls

self._constructor_fn = _make_fs_class

def __call__(self, **kwargs) -> FeatureStructure:
"""Creates an feature structure of this type
Expand Down
113 changes: 50 additions & 63 deletions cassis/util.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import csv
from collections import defaultdict
from functools import cmp_to_key
from io import IOBase, StringIO
from typing import Dict, Iterable, Set
from typing import Any, Dict, Iterable, Set

from cassis import Cas
from cassis.typesystem import FEATURE_BASE_NAME_SOFA, TYPE_NAME_ANNOTATION, FeatureStructure, Type, is_array
from cassis.typesystem import (
FEATURE_BASE_NAME_SOFA,
TYPE_NAME_ANNOTATION,
FeatureStructure,
Type,
is_annotation,
is_array,
)

_EXCLUDED_FEATURES = {FEATURE_BASE_NAME_SOFA}
_NULL_VALUE = "<NULL>"
Expand Down Expand Up @@ -74,7 +80,7 @@ def _render_feature_structure(
) -> []:
row_data = [fs_id_to_anchor.get(fs.xmiID)]

if max_covered_text > 0 and _is_annotation_fs(fs):
if max_covered_text > 0 and is_annotation(fs):
covered_text = fs.get_covered_text()
if covered_text and len(covered_text) >= max_covered_text:
prefix = covered_text[0 : (max_covered_text // 2)]
Expand Down Expand Up @@ -143,7 +149,19 @@ def _generate_anchors(
for t in types_sorted:
type_ = cas.typesystem.get_type(t)
feature_structures = all_feature_structures_by_type[type_.name]
feature_structures.sort(key=cmp_to_key(lambda a, b: _compare_fs(type_, a, b)))
# Sort deterministically using a stable key function. We avoid using
# the comparator-based approach to prevent unpredictable comparisons
# between mixed types during lexicographic tuple comparisons.
feature_structures.sort(
key=lambda fs: (
0,
fs.begin,
fs.end,
str(_feature_structure_hash(type_, fs)),
)
if is_annotation(fs)
else (1, None, None, str(_feature_structure_hash(type_, fs)))
)

for fs in feature_structures:
add_index_mark = mark_indexed and fs in indexed_feature_structures
Expand All @@ -159,7 +177,7 @@ def _generate_anchors(
def _generate_anchor(fs: FeatureStructure, add_index_mark: bool) -> str:
anchor = fs.type.name.rsplit(".", 2)[-1] # Get the short type name (no package)

if _is_annotation_fs(fs):
if is_annotation(fs):
anchor += f"[{fs.begin}-{fs.end}]"

if add_index_mark:
Expand All @@ -171,7 +189,7 @@ def _generate_anchor(fs: FeatureStructure, add_index_mark: bool) -> str:
return anchor


def _is_primitive_value(value: any) -> bool:
def _is_primitive_value(value: Any) -> bool:
return type(value) in (int, float, bool, str)


Expand All @@ -182,65 +200,34 @@ def _is_array_fs(fs: FeatureStructure) -> bool:
return is_array(fs.type)


def _is_annotation_fs(fs: FeatureStructure) -> bool:
return hasattr(fs, "begin") and isinstance(fs.begin, int) and hasattr(fs, "end") and isinstance(fs.end, int)


def _compare_fs(type_: Type, a: FeatureStructure, b: FeatureStructure) -> int:
if a is b:
return 0

# duck-typing check if something is a annotation - if yes, try sorting by offets
fs_a_is_annotation = _is_annotation_fs(a)
fs_b_is_annotation = _is_annotation_fs(b)
if fs_a_is_annotation != fs_b_is_annotation:
return -1
if fs_a_is_annotation and fs_b_is_annotation:
begin_cmp = a.begin - b.begin
if begin_cmp != 0:
return begin_cmp

begin_cmp = b.end - a.end
if begin_cmp != 0:
return begin_cmp

# Alternative implementation
# Doing arithmetics on the hash value as we have done with the offsets does not work because the hashes do not
# provide a global order. Hence, we map all results to 0, -1 and 1 here.
fs_hash_a = _feature_structure_hash(type_, a)
fs_hash_b = _feature_structure_hash(type_, b)
if fs_hash_a == fs_hash_b:
return 0
return -1 if fs_hash_a < fs_hash_b else 1


def _feature_structure_hash(type_: Type, fs: FeatureStructure):
hash_ = 0
# For backward compatibility keep a function that returns a stable string
# representation of the FS contents. This is used as a deterministic
# tie-breaker when sorting. We avoid returning complex nested tuples to
# keep comparisons simple and stable across original and deserialized CASes.
def _render_val(v):
if v is None:
return "<NULL>"
if type(v) in (int, float, bool, str):
return str(v)
if _is_array_fs(v):
# Join element representations with '|'
return "[" + ",".join(_render_val(e) for e in (v.elements or [])) + "]"
# Feature structure reference
try:
if is_annotation(v):
return f"{v.type.name}@{v.begin}-{v.end}"
else:
return f"{v.type.name}"
except Exception:
return str(v)

if _is_array_fs(fs):
return len(fs.elements) if fs.elements else 0
return _render_val(fs.elements or [])

# Should be possible to get away with not sorting here assuming that all_features returns the features always in
# the same order
parts: list[str] = []
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent type hint syntax. The file imports Dict and Any from typing (line 4) but uses the built-in list[str] syntax here instead of List[str] from typing. For consistency with the rest of the file and better compatibility, either add List to the imports and use List[str], or update all type hints to use PEP 585 built-in generics throughout the file.

Copilot uses AI. Check for mistakes.
for feature in type_.all_features:
if feature.name == FEATURE_BASE_NAME_SOFA:
continue

feature_value = getattr(fs, feature.name)

if _is_array_fs(feature_value):
if feature_value.elements is not None:
for element in feature_value.elements:
hash_ = _feature_value_hash(feature_value, hash_)
else:
hash_ = _feature_value_hash(feature_value, hash_)
return hash_


def _feature_value_hash(feature_value: any, hash_: int):
# Note we do not recurse further into arrays here because that could lead to endless loops!
if type(feature_value) in (int, float, bool, str):
return hash_ + hash(feature_value)
else:
# If we get here, it is a feature structure reference... we cannot really recursively
# go into it to calculate a recursive hash... so we just check if the value is non-null
return hash_ * (-1 if feature_value is None else 1)
parts.append(_render_val(getattr(fs, feature.name)))
return "|".join(parts)
18 changes: 12 additions & 6 deletions cassis/xmi.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,19 @@ def _serialize_feature_structure(self, cas: Cas, root: etree.Element, fs: Featur
continue

# Map back from offsets in Unicode codepoints to UIMA UTF-16 based offsets
if (
ts.is_instance_of(fs.type.name, TYPE_NAME_ANNOTATION)
and feature_name == FEATURE_BASE_NAME_BEGIN
or feature_name == FEATURE_BASE_NAME_END
# Ensure we only convert begin/end for annotation instances. Parentheses are
# required because `and` has higher precedence than `or` and we must not
# attempt conversion for the END feature on non-annotations.
if ts.is_instance_of(fs.type.name, TYPE_NAME_ANNOTATION) and (
feature_name == FEATURE_BASE_NAME_BEGIN or feature_name == FEATURE_BASE_NAME_END
):
sofa: Sofa = fs.sofa
value = sofa._offset_converter.python_to_external(value)
# Be defensive: only perform offset conversion if the sofa and its
# offset converter have been initialized. In some workflows (e.g. a
# freshly constructed CAS without sofa strings) the converter may
# not exist yet and conversion is not possible.
sofa = getattr(fs, "sofa", None)
if sofa is not None and getattr(sofa, "_offset_converter", None) is not None:
value = sofa._offset_converter.python_to_external(value)

if ts.is_instance_of(feature.rangeType, TYPE_NAME_STRING_ARRAY) and not feature.multipleReferencesAllowed:
if value.elements is not None: # Compare to none as not to skip if elements is empty!
Expand Down
33 changes: 33 additions & 0 deletions tests/test_cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,43 @@ def test_covered_text_on_non_annotation():
top.get_covered_text()


def test_add_non_annotation_and_select():
"""Create a non-annotation type, add an instance and verify select returns it."""
cas = Cas()

# Create a type that does not define annotation offsets (begin/end)
NonAnnotation = cas.typesystem.create_type("test.NonAnnotation")

# Instantiate and add to CAS
fs = NonAnnotation()
cas.add(fs)

# Should be retrievable by select using the type name
selected = list(cas.select("test.NonAnnotation"))
assert selected == [fs]

# And visible via select_all
assert fs in cas.select_all()


def test_covered_text_on_annotation_without_sofa():
cas = Cas()
Annotation = cas.typesystem.get_type(TYPE_NAME_ANNOTATION)
ann = Annotation()

with pytest.raises(AnnotationHasNoSofa):
ann.get_covered_text()


def test_runtime_generated_annotation_is_detected_and_shown_in_anchor():
ts = TypeSystem()
# Create a new annotation subtype (should inherit from Annotation base)
MyAnno = ts.create_type("my.pkg.MyAnnotation", supertypeName="uima.tcas.Annotation")

cas = Cas(ts)
# Create an instance of the runtime-generated type; ensure we can set begin/end
a = MyAnno(begin=5, end=10)
cas.add(a)

text = cas_to_comparable_text(cas)
assert "MyAnnotation[5-10]" in text
Loading