|
1 | 1 | import json |
| 2 | +from copy import deepcopy |
2 | 3 | from typing import Dict, List, Any, Callable, Optional |
3 | 4 | from functools import partial |
4 | 5 | from jupyter_ydoc.ybasedoc import YBaseDoc |
@@ -61,7 +62,6 @@ def get(self) -> str: |
61 | 62 | dataset = json.loads(self._ydataset.to_json()) |
62 | 63 | links = json.loads(self._ylinks.to_json()) |
63 | 64 | tabs = json.loads(self._ytabs.to_json()) |
64 | | - |
65 | 65 | contents.setdefault("__main__", {}) |
66 | 66 |
|
67 | 67 | tab_names = sorted(list(tabs.keys())) |
@@ -91,8 +91,7 @@ def get(self) -> str: |
91 | 91 | for data_name in data_names: |
92 | 92 | contents[data_name] = dataset[data_name] |
93 | 93 |
|
94 | | - for link_name in link_names: |
95 | | - contents[link_name] = links[link_name] |
| 94 | + self.add_links_to_contents(links, contents) |
96 | 95 |
|
97 | 96 | return json.dumps(contents, indent=2, sort_keys=True) |
98 | 97 |
|
@@ -127,47 +126,7 @@ def set(self, value: str) -> None: |
127 | 126 | for attribute in contents.get(data_name, {}).get("primary_owner", []): |
128 | 127 | attributes[attribute] = contents.get(attribute, {}) |
129 | 128 |
|
130 | | - links: Dict[str, Dict] = {} |
131 | | - for link_name in link_names: |
132 | | - link: Dict = contents.get(link_name, {}) |
133 | | - uniform_link = {"_type": link.pop("_type")} |
134 | | - if uniform_link["_type"] == COMPONENT_LINK_TYPE: |
135 | | - uniform_link["data1"] = next( |
136 | | - ( |
137 | | - k |
138 | | - for k, v in dataset.items() |
139 | | - if link["frm"][0] in v["primary_owner"] |
140 | | - ), |
141 | | - None, |
142 | | - ) |
143 | | - uniform_link["data2"] = next( |
144 | | - ( |
145 | | - k |
146 | | - for k, v in dataset.items() |
147 | | - if link["to"][0] in v["primary_owner"] |
148 | | - ), |
149 | | - None, |
150 | | - ) |
151 | | - uniform_link["cids1"] = link.pop("frm") |
152 | | - uniform_link["cids2"] = link.pop("to") |
153 | | - for i in [1, 2]: |
154 | | - uniform_link[f"cids{i}_labels"] = [ |
155 | | - attributes[attribute]["label"] |
156 | | - for attribute in uniform_link[f"cids{i}"] |
157 | | - ] |
158 | | - else: |
159 | | - for i in [1, 2]: |
160 | | - listName = link.pop(f"cids{i}") |
161 | | - uniform_link[f"cids{i}"] = contents.get(listName, {}).get( |
162 | | - "contents" |
163 | | - ) |
164 | | - uniform_link[f"cids{i}_labels"] = [ |
165 | | - attributes[attribute]["label"] |
166 | | - for attribute in uniform_link[f"cids{i}"] |
167 | | - ] |
168 | | - |
169 | | - uniform_link.update(link) |
170 | | - links[link_name] = uniform_link |
| 129 | + links = self.extract_links_from_file(link_names, contents, dataset, attributes) |
171 | 130 |
|
172 | 131 | with self._ydoc.begin_transaction() as t: |
173 | 132 | self._ycontents.update(t, contents.items()) |
@@ -216,3 +175,88 @@ def remove_tab_viewer(self, tab_name: str, viewer_id: str) -> None: |
216 | 175 | if tab is not None: |
217 | 176 | with self._ydoc.begin_transaction() as t: |
218 | 177 | tab.pop(t, viewer_id) |
| 178 | + |
| 179 | + def extract_links_from_file( |
| 180 | + self, |
| 181 | + link_names: List[str], |
| 182 | + contents: Dict, |
| 183 | + dataset: Dict[str, Dict], |
| 184 | + attributes: Dict[str, Dict], |
| 185 | + ) -> Dict[str, Dict]: |
| 186 | + links: Dict[str, Dict] = {} |
| 187 | + for link_name in link_names: |
| 188 | + link: Dict = deepcopy(contents.get(link_name, {})) |
| 189 | + uniform_link = {"_type": link.pop("_type")} |
| 190 | + if uniform_link["_type"] == COMPONENT_LINK_TYPE: |
| 191 | + uniform_link["data1"] = next( |
| 192 | + ( |
| 193 | + k |
| 194 | + for k, v in dataset.items() |
| 195 | + if link["frm"][0] in v["primary_owner"] |
| 196 | + ), |
| 197 | + None, |
| 198 | + ) |
| 199 | + uniform_link["data2"] = next( |
| 200 | + ( |
| 201 | + k |
| 202 | + for k, v in dataset.items() |
| 203 | + if link["to"][0] in v["primary_owner"] |
| 204 | + ), |
| 205 | + None, |
| 206 | + ) |
| 207 | + uniform_link["cids1"] = link.pop("frm") |
| 208 | + uniform_link["cids2"] = link.pop("to") |
| 209 | + for i in [1, 2]: |
| 210 | + uniform_link[f"cids{i}_labels"] = [ |
| 211 | + attributes[attribute]["label"] |
| 212 | + for attribute in uniform_link[f"cids{i}"] |
| 213 | + ] |
| 214 | + else: |
| 215 | + for i in [1, 2]: |
| 216 | + listName = link.pop(f"cids{i}") |
| 217 | + uniform_link[f"cids{i}"] = contents.get(listName, {}).get( |
| 218 | + "contents" |
| 219 | + ) |
| 220 | + uniform_link[f"cids{i}_labels"] = [ |
| 221 | + attributes[attribute]["label"] |
| 222 | + for attribute in uniform_link[f"cids{i}"] |
| 223 | + ] |
| 224 | + |
| 225 | + uniform_link.update(link) |
| 226 | + links[link_name] = uniform_link |
| 227 | + return links |
| 228 | + |
| 229 | + def add_links_to_contents(self, links: Dict[str, Dict], contents: Dict): |
| 230 | + # Delete former links and attributes lists. |
| 231 | + for link_names in contents.get(self._data_collection_name, {}).get("links", []): |
| 232 | + link = contents.pop(link_names, {}) |
| 233 | + |
| 234 | + # Delete the list objects containing the attributes of advanced links. |
| 235 | + if link.get("_type", "") != COMPONENT_LINK_TYPE: |
| 236 | + contents.pop(link.get("cids1", None), None) |
| 237 | + contents.pop(link.get("cids2", None), None) |
| 238 | + contents[self._data_collection_name]["links"] = [] |
| 239 | + |
| 240 | + # Create the new links and attribute lists if necessary. |
| 241 | + lists_count = -1 |
| 242 | + for link_name, link in links.items(): |
| 243 | + if link["_type"] == COMPONENT_LINK_TYPE: |
| 244 | + link["frm"] = link.pop("cids1", []) |
| 245 | + link["to"] = link.pop("cids2", []) |
| 246 | + for i in [1, 2]: |
| 247 | + link.pop(f"cids{i}_labels", None) |
| 248 | + link.pop(f"data{i}", None) |
| 249 | + else: |
| 250 | + for i in [1, 2]: |
| 251 | + list_name = f"list{'' if lists_count < 0 else f'_{lists_count}'}" |
| 252 | + lists_count += 1 |
| 253 | + link.pop(f"cids{i}_labels", None) |
| 254 | + link.pop(f"data{i}", None) |
| 255 | + attr_list = { |
| 256 | + "_type": "builtins.list", |
| 257 | + "contents": link.pop(f"cids{i}", []), |
| 258 | + } |
| 259 | + contents[list_name] = attr_list |
| 260 | + link[f"cids{i}"] = list_name |
| 261 | + contents[link_name] = link |
| 262 | + contents[self._data_collection_name]["links"].append(link_name) |
0 commit comments