Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 312 additions & 3 deletions cassis/cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cassis.typesystem import (
FEATURE_BASE_NAME_HEAD,
FEATURE_BASE_NAME_LANGUAGE,
TYPE_NAME_ARRAY_BASE,
TYPE_NAME_DOCUMENT_ANNOTATION,
TYPE_NAME_ANNOTATION,
TYPE_NAME_FS_ARRAY,
Expand All @@ -24,6 +25,7 @@
TypeCheckError,
TypeSystem,
TypeSystemMode,
load_typesystem,
)

_validator_optional_string = validators.optional(validators.instance_of(str))
Expand Down Expand Up @@ -903,9 +905,8 @@ def _find_all_fs(
elif feature.rangeType.name == TYPE_NAME_FS_LIST and hasattr(feature_value, FEATURE_BASE_NAME_HEAD):
v = feature_value
while hasattr(v, FEATURE_BASE_NAME_HEAD):
if not v.head or v.head.xmiID in all_fs:
continue
openlist.append(v.head)
if v.head and v.head.xmiID not in all_fs:
openlist.append(v.head)
v = v.tail
# For primitive arrays / lists, we do not need to handle the elements
continue
Expand Down Expand Up @@ -937,6 +938,314 @@ def _copy(self) -> "Cas":
result._xmi_id_generator = self._xmi_id_generator
return result

def deep_copy(self, copy_typesystem: bool = False) -> "Cas":
"""
Create and return a deep copy of this CAS object.
All feature structures, views, and sofas are copied. If `copy_typesystem` is True, the typesystem is also deep-copied;
otherwise, the original typesystem is shared between the original and the copy.
Args:
copy_typesystem (bool): Whether to copy the original typesystem or not. If True, the typesystem is deep-copied.
Returns:
Cas: A deep copy of this CAS object.
"""
ts = self.typesystem
if copy_typesystem:
ts = self.typesystem.to_xml()
ts = load_typesystem(ts)

# Determine document language only if a DocumentAnnotation already
# exists on the original CAS. Calling `self.document_language` would
# implicitly create a DocumentAnnotation via
# `get_document_annotation()`, which we avoid during copying.
document_language = None
existing_doc_ann = list(self.select(TYPE_NAME_DOCUMENT_ANNOTATION))
if existing_doc_ann:
# Use the stored language value (may be None)
document_language = existing_doc_ann[0].get(FEATURE_BASE_NAME_LANGUAGE)

Comment thread
reckart marked this conversation as resolved.
Outdated
cas_copy = Cas(
ts,
document_language=document_language,
lenient=self._lenient,
sofa_mime=self.sofa_mime,
)

cas_copy._views = {}
cas_copy._sofas = {}

def _collect_fs_list_references(fs_list: FeatureStructure) -> List[Optional[int]]:
referenced_list = []
current = fs_list

while hasattr(current, FEATURE_BASE_NAME_HEAD):
head = current.head
if head is None:
referenced_list.append(None)
elif hasattr(head, "xmiID") and head.xmiID is not None:
referenced_list.append(head.xmiID)
else:
warnings.warn("FSList item without xmiID encountered during deep copy; preserving as None in copy.")
referenced_list.append(None)
Comment on lines +966 to +973

current = current.tail

return referenced_list

def _build_fs_list(referenced_list: List[Optional[int]]) -> FeatureStructure:
current = ts.get_type("uima.cas.EmptyFSList")()

for reference_id in reversed(referenced_list):
node = ts.get_type("uima.cas.NonEmptyFSList")()
node.tail = current
node.head = all_copied_fs.get(reference_id) if reference_id is not None else None
current = node

return current

for sofa in self.sofas:
sofa_copy = Sofa(
sofaID=sofa.sofaID,
sofaNum=sofa.sofaNum,
type=ts.get_type(sofa.type.name),
xmiID=sofa.xmiID,
)
sofa_copy.mimeType = sofa.mimeType
sofa_copy.sofaArray = sofa.sofaArray
Comment thread
reckart marked this conversation as resolved.
sofa_copy.sofaString = sofa.sofaString
sofa_copy.sofaURI = sofa.sofaURI

cas_copy._sofas[sofa_copy.sofaID] = sofa_copy
cas_copy._views[sofa_copy.sofaID] = View(sofa=sofa_copy)
Comment thread
reckart marked this conversation as resolved.

# Set the current view to the `_InitialView` entry in the copied CAS.
# (`Cas.__init__` creates an `_InitialView`; here we point the current
# view at that entry in the `cas_copy._views` mapping so subsequent
# `add()` calls index into the initial view by default.)
cas_copy._current_view = cas_copy._views["_InitialView"]

references = dict()
referenced_arrays = dict()
referenced_fs_arrays = dict()
referenced_primitive_arrays = dict()
referenced_lists = dict()
# for primitive lists (e.g. IntegerList) we collect primitive head values
referenced_primitive_lists = dict()

all_copied_fs = dict()
referenced_view = {}

for view in self.views:
for member in view.get_all_annotations():
if hasattr(member, "xmiID") and member.xmiID is not None:
referenced_view[member.xmiID] = view.sofa.sofaID
Comment thread
reckart marked this conversation as resolved.
Outdated

for fs in self._find_all_fs():
Comment thread
reckart marked this conversation as resolved.
Outdated
t = ts.get_type(fs.type.name)
Comment thread
reckart marked this conversation as resolved.
Outdated
fs_copy = t()

if t.name == TYPE_NAME_FS_ARRAY and fs.elements is not None:
referenced_list = []
for item in fs.elements:
if item is None:
referenced_list.append(None)
elif hasattr(item, "xmiID") and item.xmiID is not None:
referenced_list.append(item.xmiID)
else:
warnings.warn(
f"Standalone FSArray {fs.xmiID} contains an unidentifiable item; preserving as None in copy."
)
referenced_list.append(None)
Comment on lines +1044 to +1053
Comment on lines +1044 to +1053

referenced_fs_arrays[fs.xmiID] = referenced_list
elif t.supertype.name == TYPE_NAME_ARRAY_BASE and fs.elements is not None:
referenced_primitive_arrays[fs.xmiID] = list(fs.elements)

for feature in t.all_features:
if t.supertype.name == TYPE_NAME_ARRAY_BASE and feature.name == "elements":
continue

if ts.is_primitive(feature.rangeType):
fs_copy[feature.name] = fs.get(feature.name)
elif ts.is_primitive_collection(feature.rangeType):
val = fs.get(feature.name)
if val is None:
continue

if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, val.xmiID))
continue

# Distinguish primitive arrays (have `elements`) from primitive lists (use head/tail)
if ts.is_array(feature.rangeType):
fs_copy[feature.name] = ts.get_type(feature.rangeType.name)()
# shallow-copy the elements list to avoid sharing the same list object
fs_copy[feature.name].elements = list(val.elements)
elif ts.is_list(feature.rangeType):
Comment thread
reckart marked this conversation as resolved.
# collect primitive values from head/tail style lists
current = val
prim_list = []
while hasattr(current, FEATURE_BASE_NAME_HEAD):
head = getattr(current, FEATURE_BASE_NAME_HEAD)
prim_list.append(head)
current = current.tail

# store the primitive list values along with the declared range type name
referenced_primitive_lists.setdefault(fs.xmiID, {})
referenced_primitive_lists[fs.xmiID][feature.name] = (
feature.rangeType.name,
prim_list,
)
Comment thread
reckart marked this conversation as resolved.
elif ts.is_array(feature.rangeType):
val = fs[feature.name]
if val is None:
continue

# If the array itself may be shared (multipleReferencesAllowed), preserve
# its identity by treating it like any other FS reference and wiring it
# up later via `references`. Only inline-copy arrays when they are not
# declared shareable.
if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, val.xmiID))
else:
fs_copy[feature.name] = ts.get_type(TYPE_NAME_FS_ARRAY)()
# collect referenced xmiIDs for mapping later and preserve None placeholders
referenced_list = []
for item in val.elements:
if item is None:
referenced_list.append(None)
elif hasattr(item, "xmiID") and item.xmiID is not None:
referenced_list.append(item.xmiID)
else:
warnings.warn(
f"Array feature '{feature.name}' of FS {fs.xmiID} contains an unidentifiable item; preserving as None in copy."
)
referenced_list.append(None)
Comment on lines +1113 to +1120
referenced_arrays.setdefault(fs.xmiID, {})
referenced_arrays[fs.xmiID][feature.name] = referenced_list
elif ts.is_list(feature.rangeType):
val = fs[feature.name]
if val is None:
continue

if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, val.xmiID))
else:
referenced_lists.setdefault(fs.xmiID, {})
referenced_lists[fs.xmiID][feature.name] = _collect_fs_list_references(val)
elif feature.rangeType.name == TYPE_NAME_SOFA:
# ignore sofa references
pass
else:
val = fs[feature.name]
# If the original feature value is None, preserve it without warning
if val is None:
continue
if hasattr(val, "xmiID") and val.xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, val.xmiID))
else:
warnings.warn(
f'Original non-primitive feature "{feature.name}" was not copied from feature structure {fs.xmiID}.'
)
Comment thread
reckart marked this conversation as resolved.

Comment thread
reckart marked this conversation as resolved.
fs_copy.xmiID = fs.xmiID
all_copied_fs[fs_copy.xmiID] = fs_copy

# set references to single objects
for feature, pairs in references.items():
for current_ID, reference_ID in pairs:
try:
all_copied_fs[current_ID][feature] = all_copied_fs[reference_ID]
except KeyError:
warnings.warn(
f"Reference {reference_ID} not found for feature '{feature}' of feature structure {current_ID}"
)

# set references for objects in arrays
for current_ID, arrays in referenced_arrays.items():
for feature, referenced_list in arrays.items():
elements = []
for reference_ID in referenced_list:
if reference_ID is None:
elements.append(None)
continue
try:
elements.append(all_copied_fs[reference_ID])
except KeyError:
warnings.warn(
f"Reference {reference_ID} not found for array feature '{feature}' of feature structure {current_ID}; inserting None."
)
elements.append(None)
all_copied_fs[current_ID][feature].elements = elements

for current_ID, referenced_list in referenced_fs_arrays.items():
elements = []
for reference_ID in referenced_list:
if reference_ID is None:
elements.append(None)
continue
try:
elements.append(all_copied_fs[reference_ID])
except KeyError:
warnings.warn(
f"Reference {reference_ID} not found for standalone FSArray {current_ID}; inserting None."
)
elements.append(None)
all_copied_fs[current_ID].elements = elements

for current_ID, elements in referenced_primitive_arrays.items():
all_copied_fs[current_ID].elements = list(elements)

# rebuild FSList features from copied members
for current_ID, lists in referenced_lists.items():
for feature, referenced_list in lists.items():
all_copied_fs[current_ID][feature] = _build_fs_list(referenced_list)

# rebuild primitive head/tail lists (e.g. IntegerList, FloatList, StringList)
for current_ID, lists in referenced_primitive_lists.items():
for feature, (list_type_name, primitive_values) in lists.items():
# derive Empty/NonEmpty concrete type names from the abstract list type
suffix = list_type_name.split(".")[-1]
empty_name = f"uima.cas.Empty{suffix}"
nonempty_name = f"uima.cas.NonEmpty{suffix}"

current = ts.get_type(empty_name)()
for value in reversed(primitive_values):
node = ts.get_type(nonempty_name)()
node.tail = current
node.head = value
current = node

all_copied_fs[current_ID][feature] = current

# ensure Sofa.sofaArray references point to the copied feature structures
for sofa_id, sofa_copy in cas_copy._sofas.items():
orig_sofa_array = sofa_copy.sofaArray
if hasattr(orig_sofa_array, "xmiID") and orig_sofa_array.xmiID in all_copied_fs:
sofa_copy.sofaArray = all_copied_fs[orig_sofa_array.xmiID]

# Add only original view members back to the copied indices. Referenced
# feature structures that were not indexed in any original view remain
# reachable transitively and will still be serialized by `_find_all_fs()`.
feature_structures = sorted(all_copied_fs.values(), key=lambda f: f.xmiID, reverse=False)
for item in feature_structures:
if not hasattr(item, "xmiID") or item.xmiID is None:
continue

view_name = referenced_view.get(item.xmiID)
if view_name is None:
continue

cas_copy._current_view = cas_copy._views[view_name]
cas_copy.add(item, keep_id=True)

Comment thread
reckart marked this conversation as resolved.
cas_copy._xmi_id_generator = IdGenerator(initial_id=self._xmi_id_generator._next_id)
cas_copy._sofa_num_generator = IdGenerator(initial_id=self._sofa_num_generator._next_id)
return cas_copy


def _sort_func(a: FeatureStructure) -> Tuple[int, int, int]:
d = a.__slots__
Expand Down
Loading
Loading