Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 103 additions & 7 deletions src/typeagent/knowpro/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the MIT License.

import asyncio
from dataclasses import dataclass

from typechat import Result, TypeChatLanguageModel

Expand Down Expand Up @@ -74,14 +75,109 @@ async def extract_knowledge_from_text_batch(
def merge_concrete_entities(
entities: list[kplib.ConcreteEntity],
) -> list[kplib.ConcreteEntity]:
"""Merge a list of concrete entities into a single list of merged entities."""
raise NotImplementedError("TODO")
# merged_entities = concrete_to_merged_entities(entities)
"""Merge a list of concrete entities by name, combining types and facets.

Entities with the same name (case-insensitive) are merged:
- Names, types, and facet names/values are lowercased for matching
- Types are combined into a sorted unique list (lowercased)
- Facets with the same name have their unique values concatenated with "; "

Note:
This function normalizes all text to lowercase, matching the TypeScript
implementation in knowledgeMerge.ts. Facet values are converted to
strings during merging. Complex types like Quantity and Quantifier
use their __str__ representation (e.g., "5 kg" or "many items").

Returns:
A list of merged entities sorted by name for deterministic ordering.
"""
if not entities:
return []

# Build a dict of merged entities keyed by lowercased name
merged: dict[str, _MergedEntity] = {}

for entity in entities:
name_key = entity.name.lower()
existing = merged.get(name_key)

if existing is None:
# First occurrence - create new merged entity
merged[name_key] = _MergedEntity(
name=entity.name.lower(),
types=set(t.lower() for t in entity.type),
facets=_facets_to_merged(entity.facets) if entity.facets else {},
)
else:
# Merge into existing
existing.types.update(t.lower() for t in entity.type)
if entity.facets:
_merge_facets(existing.facets, entity.facets)

# Convert merged entities back to ConcreteEntity, sorted by name
result = []
for merged_entity in sorted(merged.values(), key=lambda e: e.name):
concrete = kplib.ConcreteEntity(
name=merged_entity.name,
type=sorted(merged_entity.types),
)
if merged_entity.facets:
concrete.facets = _merged_to_facets(merged_entity.facets)
result.append(concrete)

return result


@dataclass
class _MergedEntity:
"""Internal helper for merging entities."""

name: str
types: set[str]
facets: dict[str, set[str]]


def _facet_value_to_string(value: kplib.Value | None) -> str:
"""Convert a facet value to a lowercase string.

Complex types like Quantity and Quantifier use their __str__ representation.
"""
return str(value).lower() if value else ""


def _add_facet_to_merged(
merged: dict[str, set[str]], facet: kplib.Facet
) -> None:
"""Add a single facet to a merged facets dict."""
name = facet.name.lower()
value = _facet_value_to_string(facet.value)
merged.setdefault(name, set()).add(value)


def _facets_to_merged(facets: list[kplib.Facet]) -> dict[str, set[str]]:
"""Convert a list of Facets to a merged facets dict.

Facet names and values are lowercased for case-insensitive merging.
"""
merged: dict[str, set[str]] = {}
for facet in facets:
_add_facet_to_merged(merged, facet)
return merged


def _merge_facets(existing: dict[str, set[str]], facets: list[kplib.Facet]) -> None:
"""Merge facets into an existing facets dict."""
for facet in facets:
_add_facet_to_merged(existing, facet)


# merged_concrete_entities = []
# for merged_entity in merged_entities.values():
# merged_concrete_entities.append(merged_to_concrete_entity(merged_entity))
# return merged_concrete_entities
def _merged_to_facets(merged_facets: dict[str, set[str]]) -> list[kplib.Facet]:
"""Convert a merged facets dict back to a list of Facets."""
facets = []
for name, values in merged_facets.items():
if values:
facets.append(kplib.Facet(name=name, value="; ".join(sorted(values))))
return facets


def merge_topics(topics: list[str]) -> list[str]:
Expand Down
150 changes: 150 additions & 0 deletions tests/test_knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
create_knowledge_extractor,
extract_knowledge_from_text,
extract_knowledge_from_text_batch,
merge_concrete_entities,
merge_topics,
)
from typeagent.knowpro.kplib import ConcreteEntity, Facet


class MockKnowledgeExtractor:
Expand Down Expand Up @@ -81,3 +83,151 @@ def test_merge_topics():
assert "topic1" in merged_topics
assert "topic2" in merged_topics
assert "topic3" in merged_topics


# Tests for merge_concrete_entities


def test_merge_concrete_entities_empty_list() -> None:
"""Test merging an empty list returns empty list."""
result = merge_concrete_entities([])
assert result == []


def test_merge_concrete_entities_single_entity() -> None:
"""Test merging a single entity lowercases names and types."""
entity = ConcreteEntity(name="Alice", type=["Person"])
result = merge_concrete_entities([entity])

assert len(result) == 1
assert result[0].name == "alice"
assert result[0].type == ["person"]


def test_merge_concrete_entities_distinct() -> None:
"""Test merging distinct entities keeps them separate (lowercased)."""
entities = [
ConcreteEntity(name="Alice", type=["Person"]),
ConcreteEntity(name="Bob", type=["Person"]),
]
result = merge_concrete_entities(entities)

assert len(result) == 2
names = {e.name for e in result}
assert names == {"alice", "bob"}


def test_merge_concrete_entities_same_name_different_case() -> None:
"""Test that entities with different case names ARE merged (case-insensitive)."""
entities = [
ConcreteEntity(name="Alice", type=["Person"]),
ConcreteEntity(name="ALICE", type=["Employee"]),
ConcreteEntity(name="alice", type=["Manager"]),
]
result = merge_concrete_entities(entities)

# Case-insensitive: all three are merged into one
assert len(result) == 1
assert result[0].name == "alice"
assert result[0].type == ["employee", "manager", "person"]


def test_merge_concrete_entities_types_deduplicated_and_sorted() -> None:
"""Test that merged types are deduplicated, lowercased, and sorted."""
entities = [
ConcreteEntity(name="Alice", type=["Person", "Employee"]),
ConcreteEntity(name="Alice", type=["Employee", "Manager"]),
]
result = merge_concrete_entities(entities)

assert len(result) == 1
assert result[0].type == ["employee", "manager", "person"]


def test_merge_concrete_entities_with_facets() -> None:
"""Test merging entities with facets."""
entities = [
ConcreteEntity(
name="Alice",
type=["Person"],
facets=[Facet(name="age", value="30")],
),
ConcreteEntity(
name="Alice",
type=["Employee"],
facets=[Facet(name="department", value="Engineering")],
),
]
result = merge_concrete_entities(entities)

assert len(result) == 1
assert result[0].facets is not None
facet_names = {f.name for f in result[0].facets}
assert facet_names == {"age", "department"}


def test_merge_concrete_entities_same_facet_combines_values() -> None:
"""Test that facets with the same name have values combined."""
entities = [
ConcreteEntity(
name="Alice",
type=["Person"],
facets=[Facet(name="hobby", value="reading")],
),
ConcreteEntity(
name="Alice",
type=["Person"],
facets=[Facet(name="hobby", value="swimming")],
),
]
result = merge_concrete_entities(entities)

assert len(result) == 1
assert result[0].facets is not None
hobby_facet = next(f for f in result[0].facets if f.name == "hobby")
assert hobby_facet.value == "reading; swimming"


def test_merge_concrete_entities_facets_deduplicated() -> None:
"""Test that duplicate facet values are deduplicated."""
entities = [
ConcreteEntity(
name="Alice",
type=["Person"],
facets=[Facet(name="hobby", value="reading")],
),
ConcreteEntity(
name="Alice",
type=["Person"],
facets=[Facet(name="hobby", value="reading")], # Duplicate
),
ConcreteEntity(
name="Alice",
type=["Person"],
facets=[Facet(name="hobby", value="swimming")],
),
]
result = merge_concrete_entities(entities)

assert len(result) == 1
assert result[0].facets is not None
hobby_facet = next(f for f in result[0].facets if f.name == "hobby")
assert hobby_facet.value == "reading; swimming"


def test_merge_concrete_entities_without_facets_with_facets() -> None:
"""Test merging an entity without facets with one that has facets."""
entities = [
ConcreteEntity(name="Alice", type=["Person"]),
ConcreteEntity(
name="Alice",
type=["Employee"],
facets=[Facet(name="department", value="Engineering")],
),
]
result = merge_concrete_entities(entities)

assert len(result) == 1
assert result[0].facets is not None
assert len(result[0].facets) == 1
assert result[0].facets[0].name == "department"