Skip to content

Commit 260d63c

Browse files
tomolopolisTom Searle
andauthored
medact(feat): cdb utils for merging and navigation using pt2ch relations (#176)
* medact(feat): cdb utils for merging and navigation using pt2ch relations fix mypy errors last mypy fix fix lint error * fix test * PR comment changes --------- Co-authored-by: Tom Searle <[email protected]>
1 parent b48555a commit 260d63c

File tree

2 files changed

+825
-0
lines changed

2 files changed

+825
-0
lines changed
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
from collections import defaultdict
2+
import logging
3+
from medcat.cdb.concepts import NameInfo
4+
import numpy as np
5+
6+
from copy import deepcopy
7+
from typing import Any, Iterable
8+
from medcat.cdb import CDB
9+
10+
logger = logging.getLogger(__name__) # separate logger from the package-level one
11+
12+
13+
def merge_cdb(cdb1: CDB,
14+
cdb2: CDB,
15+
overwrite_training: int = 0,
16+
full_build: bool = False) -> CDB:
17+
"""Merge two CDB's together to produce a new, single CDB. The contents of
18+
inputs CDBs will not be changed.
19+
`addl_info` can not be perfectly merged, and will prioritise cdb1. see `full_build`
20+
21+
Args:
22+
cdb1 (CDB):
23+
The first medcat cdb to merge. In cases where merging isn't suitable
24+
isn't ideal (such as cui2preferred_name), this cdb values will be
25+
prioritised over cdb2.
26+
cdb2 (CDB):
27+
The second medcat cdb to merge.
28+
overwrite_training (int):
29+
Choose to prioritise a CDB's context vectors values over merging gracefully.
30+
0 - no prio, 1 - CDB1, 2 - CDB2
31+
full_build (bool):
32+
Add additional information from "addl_info" dicts "cui2ontologies" and
33+
"cui2description"
34+
35+
Returns:
36+
CDB: The merged CDB.
37+
"""
38+
config = deepcopy(cdb1.config)
39+
cdb = CDB(config)
40+
41+
# Copy CDB 1 - as all settings from CDB 1 will be carried over
42+
cdb.cui2info = deepcopy(cdb1.cui2info)
43+
cdb.name2info = deepcopy(cdb1.name2info)
44+
cdb.type_id2info = deepcopy(cdb1.type_id2info)
45+
cdb.token_counts = deepcopy(cdb1.token_counts)
46+
cdb._subnames = deepcopy(cdb1._subnames)
47+
if full_build:
48+
cdb.addl_info = deepcopy(cdb1.addl_info)
49+
50+
# Merge concepts from cdb2 into the merged CDB
51+
for cui, cui_info2 in cdb2.cui2info.items():
52+
# Get name status from cdb2
53+
name_status = 'A' # default status
54+
for name in cui_info2['names']:
55+
if name in cdb2.name2info:
56+
name_info = cdb2.name2info[name]
57+
if cui in name_info['per_cui_status']:
58+
name_status = name_info['per_cui_status'][cui]
59+
break
60+
61+
# Prepare names dict for _add_concept
62+
names = {}
63+
for name in cui_info2['names']:
64+
# Create a simple NameDescriptor-like structure
65+
name_info_entry: NameInfo | None = cdb2.name2info.get(name)
66+
names[name] = type('NameDescriptor', (), {
67+
'snames': cui_info2['subnames'],
68+
# Guard for unknown structure in name2info and avoid mismatched defaults
69+
'is_upper': (bool(name_info_entry.get('is_upper', False))
70+
if isinstance(name_info_entry, dict) else False),
71+
'tokens': set(), # We don't have token info in the new structure
72+
'raw_name': name
73+
})()
74+
75+
# Get ontologies and description for full_build
76+
ontologies: set[str] = set()
77+
description = cui_info2.get('description') or ''
78+
to_build = full_build and (
79+
cui_info2.get('original_names') is not None or
80+
cui_info2.get('description') is not None
81+
)
82+
83+
if to_build:
84+
other: Iterable[str] = cui_info2.get('in_other_ontology') or []
85+
ontologies.update(other)
86+
87+
cdb._add_concept(
88+
cui=cui, names=names, ontologies=ontologies, name_status=name_status,
89+
type_ids=cui_info2['type_ids'], description=description,
90+
full_build=to_build
91+
)
92+
93+
# Copy training data from cdb2 for concepts that don't exist in cdb1
94+
if cui not in cdb1.cui2info:
95+
cui_info_merged = cdb.cui2info[cui]
96+
cui_info_merged['count_train'] = cui_info2['count_train']
97+
cui_info_merged['context_vectors'] = deepcopy(cui_info2['context_vectors'])
98+
cui_info_merged['average_confidence'] = cui_info2['average_confidence']
99+
if cui_info2.get('tags'):
100+
cui_info_merged['tags'] = deepcopy(cui_info2['tags'])
101+
102+
# Handle merging of training data for concepts that exist in both CDBs
103+
if cui in cdb1.cui2info:
104+
cui_info1 = cdb1.cui2info[cui]
105+
cui_info_merged = cdb.cui2info[cui]
106+
107+
# Merge count_train
108+
if (cui_info1['count_train'] > 0 or cui_info2['count_train'] > 0) and not (
109+
overwrite_training == 1 and cui_info1['count_train'] > 0
110+
):
111+
if overwrite_training == 2 and cui_info2['count_train'] > 0:
112+
cui_info_merged['count_train'] = cui_info2['count_train']
113+
else:
114+
cui_info_merged['count_train'] = (
115+
cui_info1['count_train'] + cui_info2['count_train']
116+
)
117+
118+
# Merge context vectors
119+
if (cui_info1['context_vectors'] is not None and
120+
not (overwrite_training == 1 and
121+
cui_info1['context_vectors'] is not None)):
122+
123+
if (overwrite_training == 2 and
124+
cui_info2['context_vectors'] is not None):
125+
cui_info_merged['context_vectors'] = deepcopy(
126+
cui_info2['context_vectors']
127+
)
128+
else:
129+
# Merge context vectors with weighted average
130+
if cui_info_merged['context_vectors'] is None:
131+
cui_info_merged['context_vectors'] = {}
132+
133+
# Get all context types from both CDBs
134+
contexts: set[str] = set()
135+
if cui_info1['context_vectors']:
136+
contexts.update(cui_info1['context_vectors'].keys())
137+
if cui_info2['context_vectors']:
138+
contexts.update(cui_info2['context_vectors'].keys())
139+
140+
# Calculate weights
141+
if overwrite_training == 2:
142+
weights: list[float] = [0.0, 1.0]
143+
else:
144+
norm = cui_info_merged['count_train']
145+
if norm > 0:
146+
weights = [
147+
np.divide(cui_info1['count_train'], norm),
148+
np.divide(cui_info2['count_train'], norm)
149+
]
150+
else:
151+
weights = [0.5, 0.5] # equal weights if no training
152+
153+
# Merge each context vector
154+
for context_type in contexts:
155+
if cui_info1['context_vectors']:
156+
vec1 = cui_info1['context_vectors'].get(
157+
context_type, np.zeros(300)
158+
)
159+
else:
160+
vec1 = np.zeros(300)
161+
162+
if cui_info2['context_vectors']:
163+
vec2 = cui_info2['context_vectors'].get(
164+
context_type, np.zeros(300)
165+
)
166+
else:
167+
vec2 = np.zeros(300)
168+
cv: dict[str, np.ndarray] = cui_info_merged['context_vectors'] # type: ignore[assignment]
169+
cv[context_type] = (weights[0] * vec1 + weights[1] * vec2)
170+
171+
# Merge tags
172+
if cui_info1.get('tags') and cui_info2.get('tags'):
173+
if cui_info_merged.get('tags') is None:
174+
cui_info_merged['tags'] = []
175+
dest_tags: list[str] = cui_info_merged['tags'] # type: ignore[assignment]
176+
src_tags = cui_info2.get('tags')
177+
if src_tags:
178+
dest_tags.extend(src_tags)
179+
180+
# Merge type_ids (already handled by _add_concept, but ensure union)
181+
cui_info_merged['type_ids'].update(cui_info2['type_ids'])
182+
183+
# Merge name training counts
184+
if overwrite_training != 1:
185+
for name, name_info2 in cdb2.name2info.items():
186+
if name in cdb1.name2info and overwrite_training == 0:
187+
# Merge training counts for names that exist in both CDBs
188+
name_info1 = cdb1.name2info[name]
189+
name_info_merged = cdb.name2info[name]
190+
name_info_merged['count_train'] = (
191+
name_info1['count_train'] + name_info2['count_train']
192+
)
193+
else:
194+
# Copy name info from cdb2 if it doesn't exist in cdb1
195+
if name not in cdb.name2info:
196+
cdb.name2info[name] = deepcopy(name_info2)
197+
198+
# Merge token counts
199+
if overwrite_training != 1:
200+
for token, count in cdb2.token_counts.items():
201+
if token in cdb.token_counts and overwrite_training == 0:
202+
cdb.token_counts[token] += count
203+
else:
204+
cdb.token_counts[token] = count
205+
206+
return cdb
207+
208+
209+
def _dedupe_preserve_order(items: list[str]) -> list[str]:
210+
seen = set()
211+
deduped_list = []
212+
for item in items:
213+
if item not in seen:
214+
seen.add(item)
215+
deduped_list.append(item)
216+
return deduped_list
217+
218+
219+
def get_all_ch(parent_cui: str, cdb):
220+
"""Get all the children of a given parent CUI. Preserves the order of the parent
221+
222+
Args:
223+
parent_cui (str): The parent CUI
224+
cdb (CDB): The CDB object
225+
226+
Returns:
227+
list: The children of the parent CUI
228+
"""
229+
all_ch = [parent_cui]
230+
for cui in cdb.addl_info.get('pt2ch', {}).get(parent_cui, []):
231+
cui_chs = get_all_ch(cui, cdb)
232+
all_ch += cui_chs
233+
return _dedupe_preserve_order(all_ch)
234+
235+
236+
def ch2pt_from_pt2ch(cdb: CDB, pt2ch_key: str = 'pt2ch'):
237+
"""Get the child to parent info from the pt2ch map in the CDB
238+
239+
Args:
240+
cdb (CDB): The CDB object with addl_info['pt2ch']
241+
pt2ch_key (str, optional): The key in the addl_info dict to get the pt2ch map
242+
from.
243+
Defaults to 'pt2ch'.
244+
Returns:
245+
dict: The child to parent info
246+
"""
247+
ch2pt = defaultdict(list)
248+
for k, vals in cdb.addl_info[pt2ch_key].items():
249+
for v in vals:
250+
ch2pt[v].append(k)
251+
return ch2pt
252+
253+
254+
def snomed_ct_concept_path(
255+
cui: str, cdb: CDB, parent_node='138875005'
256+
) -> dict[str, Any]:
257+
"""Get the concept path for a given CUI to a parent node
258+
259+
Args:
260+
cui (str): The CUI of the concept to get the path for
261+
cdb (CDB): The CDB object
262+
parent_node (str, optional): The top level parent node.
263+
Defaults to '138875005' the root SNOMED CT code.
264+
265+
Returns:
266+
dict: The concept path and links
267+
"""
268+
try:
269+
def find_parents(cui, cuis2nodes, child_node=None):
270+
parents = list(cdb.addl_info.get('ch2pt', {}).get(cui, []))
271+
all_links = []
272+
if cui not in cuis2nodes:
273+
# Get preferred name from the new CDB structure
274+
preferred_name = cdb.get_name(cui)
275+
curr_node = {'cui': cui, 'pretty_name': preferred_name}
276+
if child_node:
277+
curr_node['children'] = [child_node]
278+
cuis2nodes[cui] = curr_node
279+
if len(parents) > 0:
280+
all_links += find_parents(
281+
parents[0], cuis2nodes, child_node=curr_node
282+
)
283+
for p in parents[1:]:
284+
links = find_parents(p, cuis2nodes)
285+
all_links += [{'parent': p, 'child': cui}] + links
286+
else:
287+
if child_node:
288+
if 'children' not in cuis2nodes[cui]:
289+
cuis2nodes[cui]['children'] = []
290+
cuis2nodes[cui]['children'].append(child_node)
291+
return all_links
292+
cuis2nodes: dict[str, dict[str, Any]] = {}
293+
all_links = find_parents(cui, cuis2nodes)
294+
return {
295+
'node_path': cuis2nodes[parent_node],
296+
'links': all_links
297+
}
298+
except KeyError as e:
299+
logger.error(f'Cannot find path concept path for CUI: {cui}',
300+
exc_info=True)
301+
return {'node_path': {}, 'links': []}

0 commit comments

Comments
 (0)