Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/linkml_map/compiler/graphviz_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def compile(

# Define the class nodes with fields in UML format using HTML-like labels
# for precise control over the stacking of the fields
for target_cn, cd in specification.class_derivations.items():
for cd in specification.class_derivations:
target_cn = cd.name
source_cn = cd.populated_from
if source_cn is None:
source_cn = cd.name
Expand Down
2 changes: 1 addition & 1 deletion src/linkml_map/compiler/python_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _compile_header(self, specification: TransformationSpecification) -> str:
def _compile_iterator(self, specification: TransformationSpecification) -> Iterator[str]:
specification = deepcopy(specification)
induce_missing_values(specification, self.source_schemaview)
for cd in specification.class_derivations.values():
for cd in specification.class_derivations:
yield from self._compiled_class_derivations_iter(cd)

def _compiled_class_derivations_iter(self, cd: ClassDerivation) -> Iterator[str]:
Expand Down
2 changes: 1 addition & 1 deletion src/linkml_map/compiler/sql_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class SQLCompiler(Compiler):

def compile(self, specification: TransformationSpecification) -> CompiledSpecification:
compiled = CompiledSpecification()
for cd in specification.class_derivations.values():
for cd in specification.class_derivations:
self.compile_class(compiled, cd, specification)
return compiled

Expand Down
2 changes: 1 addition & 1 deletion src/linkml_map/compiler/templates/markdown.j2
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# {{ spec.title }}

## Class Mappings
{% for cd in spec.class_derivations.values() %}
{% for cd in spec.class_derivations %}

### {{ cd.name }} `<-` {{ cd.populated_from }}

Expand Down
15 changes: 14 additions & 1 deletion src/linkml_map/datamodel/transformer_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class TransformationSpecification(SpecificationComponent):
source_schema: Optional[str] = Field(default=None, description="""name of the schema that describes the source (input) objects""", json_schema_extra = { "linkml_meta": {'alias': 'source_schema', 'domain_of': ['TransformationSpecification']} })
target_schema: Optional[str] = Field(default=None, description="""name of the schema that describes the target (output) objects""", json_schema_extra = { "linkml_meta": {'alias': 'target_schema', 'domain_of': ['TransformationSpecification']} })
source_schema_patches: Optional[Any] = Field(default=None, description="""Schema patches to apply to the source schema before transformation. Useful for adding foreign key relationships to auto-generated schemas. Uses LinkML schema YAML structure (classes, slots, attributes, etc.).""", json_schema_extra = { "linkml_meta": {'alias': 'source_schema_patches', 'domain_of': ['TransformationSpecification']} })
class_derivations: Optional[Dict[str, ClassDerivation]] = Field(default_factory=dict, description="""Instructions on how to derive a set of classes in the target schema from classes in the source schema.""", json_schema_extra = { "linkml_meta": {'alias': 'class_derivations',
class_derivations: Optional[List[ClassDerivation]] = Field(default_factory=list, description="""Instructions on how to derive a set of classes in the target schema from classes in the source schema.""", json_schema_extra = { "linkml_meta": {'alias': 'class_derivations',
'domain_of': ['TransformationSpecification', 'ObjectDerivation']} })
enum_derivations: Optional[Dict[str, EnumDerivation]] = Field(default_factory=dict, description="""Instructions on how to derive a set of enums in the target schema""", json_schema_extra = { "linkml_meta": {'alias': 'enum_derivations', 'domain_of': ['TransformationSpecification']} })
slot_derivations: Optional[Dict[str, SlotDerivation]] = Field(default_factory=dict, description="""Instructions on how to derive a set of top level slots in the target schema""", json_schema_extra = { "linkml_meta": {'alias': 'slot_derivations',
Expand All @@ -197,6 +197,19 @@ class TransformationSpecification(SpecificationComponent):
'domain_of': ['SpecificationComponent'],
'slot_uri': 'rdfs:comment'} })

@field_validator('class_derivations', mode='before')
@classmethod
def coerce_class_derivations(cls, v):
"""Accept dict input for backward compatibility and convert to list."""
if isinstance(v, dict):
result = []
for key, cd in v.items():
if isinstance(cd, dict):
cd.setdefault('name', key)
result.append(cd)
return result
return v


class ElementDerivation(SpecificationComponent):
"""
Expand Down
1 change: 1 addition & 0 deletions src/linkml_map/datamodel/transformer_model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ classes:
range: ClassDerivation
multivalued: true
inlined: true
inlined_as_list: true
enum_derivations:
description: >-
Instructions on how to derive a set of enums in the target schema
Expand Down
6 changes: 3 additions & 3 deletions src/linkml_map/inference/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def induce_missing_values(
:param source_schemaview:
:return:
"""
for cd in specification.class_derivations.values():
for cd in specification.class_derivations:
if not cd.populated_from:
cd.populated_from = cd.name
for cd in specification.class_derivations.values():
for cd in specification.class_derivations:
for sd in cd.slot_derivations.values():
if sd.object_derivations:
#skip inference for object derivations, inferencese come from class derivation later
Expand Down Expand Up @@ -53,6 +53,6 @@ def induce_missing_values(
)
source_induced_slot_range = source_induced_slot.range

for range_cd in specification.class_derivations.values():
for range_cd in specification.class_derivations:
if range_cd.populated_from == source_induced_slot_range:
sd.range = range_cd.name
4 changes: 2 additions & 2 deletions src/linkml_map/inference/inverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def invert(self, spec: TransformationSpecification) -> TransformationSpecificati
"""
logger.info("Inverting specification")
inverted_spec = TransformationSpecification()
for cd in spec.class_derivations.values():
for cd in spec.class_derivations:
inverted_cd = self.invert_class_derivation(cd, spec)
inverted_spec.class_derivations[inverted_cd.name] = inverted_cd
inverted_spec.class_derivations.append(inverted_cd)
for ed in spec.enum_derivations.values():
inverted_ed = self.invert_enum_derivation(ed, spec)
inverted_spec.enum_derivations[inverted_ed.name] = inverted_ed
Expand Down
2 changes: 1 addition & 1 deletion src/linkml_map/inference/schema_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def derive_schema(
target_schema.imports.append(im)
for prefix in source_schema.prefixes.values():
target_schema.prefixes[prefix.prefix_prefix] = prefix
for class_derivation in specification.class_derivations.values():
for class_derivation in specification.class_derivations:
class_definition = self._derive_class(class_derivation)
target_schema.classes[class_definition.name] = class_definition
for enum_derivation in specification.enum_derivations.values():
Expand Down
1 change: 1 addition & 0 deletions src/linkml_map/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def set_transformer_specification(
self.transformer_specification = specification
elif isinstance(specification, dict):
# TODO: centralize this code
Transformer._preprocess_class_derivations(specification)
normalizer = ReferenceValidator(
package_schemaview("linkml_map.datamodel.transformer_model")
)
Expand Down
46 changes: 29 additions & 17 deletions src/linkml_map/transformer/object_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,34 @@ def index(self, source_obj: Any, target: Optional[str] = None) -> None:
else:
self.object_index = ObjectIndex(source_obj, schemaview=self.source_schemaview)

def _resolve_source_type(
self, source_type: Optional[str], sv: Optional[SchemaView]
) -> Optional[str]:
"""
Resolve the source type when not explicitly provided.

:param source_type: Explicitly provided source type, or None.
:param sv: Source schema view, may be None.
:return: Resolved source type name.
"""
if source_type is None and sv is None:
# TODO: use smarter method
source_type = self.specification.class_derivations[0].name
if source_type is None and sv is not None:
source_types = [c.name for c in sv.all_classes().values() if c.tree_root]
if len(source_types) == 1:
source_type = source_types[0]
elif len(source_types) > 1:
msg = "No source type specified and multiple root classes found"
raise ValueError(msg)
elif len(source_types) == 0:
if len(sv.all_classes()) == 1:
source_type = next(iter(sv.all_classes().keys()))
else:
msg = "No source type specified and no root classes found"
raise ValueError(msg)
return source_type

# Developer Note:
# This method has grown large. When modifying it, consider extracting to
# private methods and adding tests using the scaffold-based testing pattern.
Expand All @@ -168,23 +196,7 @@ def map_object(
:return: transformed data, either as type target_type or a dictionary
"""
sv = self.source_schemaview
# EXTRACT: _resolve_source_type(sv, source_obj) -> str
if source_type is None and sv is None:
# TODO: use smarter method
source_type = next(iter(self.specification.class_derivations.values())).name
if source_type is None and sv is not None:
source_types = [c.name for c in sv.all_classes().values() if c.tree_root]
if len(source_types) == 1:
source_type = source_types[0]
elif len(source_types) > 1:
msg = "No source type specified and multiple root classes found"
raise ValueError(msg)
elif len(source_types) == 0:
if len(sv.all_classes()) == 1:
source_type = next(iter(sv.all_classes().keys()))
else:
msg = "No source type specified and no root classes found"
raise ValueError(msg)
source_type = self._resolve_source_type(source_type, sv)

if source_type in sv.all_types():
if target_type:
Expand Down
61 changes: 55 additions & 6 deletions src/linkml_map/transformer/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def load_transformer_specification(self, path: Union[str, Path]) -> None:
"""
with open(path) as f:
obj = yaml.safe_load(f)
self._preprocess_class_derivations(obj)
# necessary to expand first
normalizer = ReferenceValidator(
package_schemaview("linkml_map.datamodel.transformer_model")
Expand All @@ -128,18 +129,57 @@ def normalize_transform_spec(
"""
obj = normalizer.normalize(obj)

class_derivations = obj.get("class_derivations", {})
for class_name, class_spec in class_derivations.items():
class_derivations = obj.get("class_derivations", [])
if isinstance(class_derivations, dict):
cd_iter = class_derivations.values()
else:
cd_iter = class_derivations
for class_spec in cd_iter:
if not isinstance(class_spec, dict):
continue
slot_derivations = class_spec.get("slot_derivations", {})
for slot, slot_spec in slot_derivations.items():
# Check for nested object_derivations
object_derivations = slot_spec.get("object_derivations", [])
for i, od in enumerate(object_derivations):
# Recursively normalize each nested class_derivation block
od_normalized = self.normalize_transform_spec(od, normalizer)
# ObjectDerivation.class_derivations stays as dict (no inlined_as_list),
# but the normalizer may convert it to list. Convert back.
od_cd = od_normalized.get("class_derivations")
if isinstance(od_cd, list):
od_normalized["class_derivations"] = {
item["name"]: item for item in od_cd if isinstance(item, dict)
}
object_derivations[i] = od_normalized
return obj

@staticmethod
def _preprocess_class_derivations(obj: dict[str, Any]) -> None:
"""
Pre-process class_derivations before ReferenceValidator normalization.

Handles two cases:
1. Dict format with None values (e.g. ``Entity:`` with no body) — replace
with empty dicts so ReferenceValidator.ensure_list doesn't choke.
2. List format with compact keys (e.g. ``- Condition: {populated_from: x}``)
— unwrap to ``{name: Condition, populated_from: x}`` so Pydantic can
validate.
"""
cd = obj.get("class_derivations")
if isinstance(cd, dict):
for k, v in cd.items():
if v is None:
cd[k] = {}
elif isinstance(cd, list):
for i, item in enumerate(cd):
if isinstance(item, dict) and len(item) == 1:
key, val = next(iter(item.items()))
if key != "name" and isinstance(val, (dict, type(None))):
expanded = val if val is not None else {}
expanded.setdefault("name", key)
cd[i] = expanded

def create_transformer_specification(self, obj: dict[str, Any]) -> None:
"""
Create specification from a dict.
Expand All @@ -149,6 +189,7 @@ def create_transformer_specification(self, obj: dict[str, Any]) -> None:
:param path:
:return:
"""
self._preprocess_class_derivations(obj)
normalizer = ReferenceValidator(
package_schemaview("linkml_map.datamodel.transformer_model")
)
Expand Down Expand Up @@ -181,7 +222,7 @@ def _get_class_derivation(self, target_class_name: str) -> ClassDerivation:
spec = self.derived_specification
matching_tgt_class_derivs = [
deriv
for deriv in spec.class_derivations.values()
for deriv in spec.class_derivations
if deriv.populated_from == target_class_name
or (not deriv.populated_from and target_class_name == deriv.name)
]
Expand All @@ -205,19 +246,27 @@ def _get_class_derivation(self, target_class_name: str) -> ClassDerivation:
setattr(cd, k, v)
return cd

def _find_class_derivation_by_name(self, name: str) -> ClassDerivation:
"""Look up a class derivation by name from the specification."""
for cd in self.specification.class_derivations:
if cd.name == name:
return cd
msg = f"No class derivation named '{name}'"
raise KeyError(msg)

def _class_derivation_ancestors(self, cd: ClassDerivation) -> dict[str, ClassDerivation]:
"""
Return a map of all class derivations that are ancestors of the given class derivation.

:param cd:
:return:
"""
spec = self.specification
ancestors = {}
parents = cd.mixins + ([cd.is_a] if cd.is_a else [])
for parent in parents:
ancestors[parent] = spec.class_derivations[parent]
ancestors.update(self._class_derivation_ancestors(spec.class_derivations[parent]))
parent_cd = self._find_class_derivation_by_name(parent)
ancestors[parent] = parent_cd
ancestors.update(self._class_derivation_ancestors(parent_cd))
return ancestors

def _get_enum_derivation(self, target_enum_name: str) -> EnumDerivation:
Expand Down
2 changes: 2 additions & 0 deletions src/linkml_map/utils/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from linkml_runtime.utils.introspection import package_schemaview

from linkml_map.datamodel.transformer_model import TransformationSpecification
from linkml_map.transformer.transformer import Transformer


def load_specification(path: Union[Path, str]) -> TransformationSpecification:
if isinstance(path, Path):
path = str(path)
with open(path) as f:
obj = yaml.safe_load(f)
Transformer._preprocess_class_derivations(obj)
# necessary to expand first
normalizer = ReferenceValidator(
package_schemaview("linkml_map.datamodel.transformer_model")
Expand Down
40 changes: 40 additions & 0 deletions templates/pydantic/class.py.jinja
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
class {{ name }}({% if bases is string %}{{ bases }}{% else %}{{ bases | join(', ') }}{% endif %}):
{% if description %}
"""
{{ description | indent(width=4) }}
"""
{% endif -%}
{% if meta %}
linkml_meta: ClassVar[LinkMLMeta] = LinkMLMeta({{ meta | pprint | indent(width=8) }})

{% endif %}
{% if attributes or validators %}
{% if attributes %}
{% for attr in attributes.values() %}
{{ attr }}
{% endfor -%}
{% endif %}
{% if validators %}
{% for validator in validators.values() %}

{{ validator }}
{% endfor -%}
{% endif %}
{% else %}
pass
{% endif %}
{% if name == 'TransformationSpecification' %}

@field_validator('class_derivations', mode='before')
@classmethod
def coerce_class_derivations(cls, v):
"""Accept dict input for backward compatibility and convert to list."""
if isinstance(v, dict):
result = []
for key, cd in v.items():
if isinstance(cd, dict):
cd.setdefault('name', key)
result.append(cd)
return result
return v
{% endif %}
Loading