|
1 | | -from __future__ import annotations |
2 | 1 | import csv |
3 | | -import functools |
4 | | -import dataclasses |
5 | | -import typing |
6 | 2 |
|
7 | 3 | from trove.vocab import mediatypes |
8 | | -from trove.vocab import osfmap |
9 | | -from trove.vocab.namespaces import TROVE |
10 | | -from ._simple_trovesearch import SimpleTrovesearchRenderer |
11 | | -from ._rendering import StreamableRendering |
12 | 4 |
|
| 5 | +from .simple_csv import TrovesearchSimpleCsvRenderer |
13 | 6 |
|
14 | | -Jsonpath = typing.Iterable[str] # path of json keys |
15 | 7 |
|
16 | | -_MULTIVALUE_DELIMITER = ' ; ' # possible improvement: smarter in-value delimiting? |
17 | | -_VALUE_KEY_PREFERENCE = ('@value', '@id', 'name', 'prefLabel', 'label') |
18 | | - |
19 | | - |
20 | | -class TrovesearchSimpleTsvRenderer(SimpleTrovesearchRenderer): |
| 8 | +class TrovesearchSimpleTsvRenderer(TrovesearchSimpleCsvRenderer): |
21 | 9 | MEDIATYPE = mediatypes.TSV |
22 | | - INDEXCARD_DERIVER_IRI = TROVE['derive/osfmap_json'] |
23 | 10 | CSV_DIALECT: type[csv.Dialect] = csv.excel_tab |
24 | | - |
25 | | - def unicard_rendering(self, card_iri: str, osfmap_json: dict): |
26 | | - self.multicard_rendering(cards=[(card_iri, osfmap_json)]) |
27 | | - |
28 | | - def multicard_rendering(self, cards: typing.Iterable[tuple[str, dict]]): |
29 | | - _doc = TabularDoc(list(cards)) # TODO: static column header, actual stream |
30 | | - return StreamableRendering( |
31 | | - mediatype=self.MEDIATYPE, |
32 | | - content_stream=csv_stream(self.CSV_DIALECT, _doc.header(), _doc.rows()), |
33 | | - ) |
34 | | - |
35 | | - |
36 | | -def csv_stream(csv_dialect, header: list, rows: typing.Iterator[list]) -> typing.Iterator[str]: |
37 | | - _writer = csv.writer(_Echo(), dialect=csv_dialect) |
38 | | - yield _writer.writerow(header) |
39 | | - for _row in rows: |
40 | | - yield _writer.writerow(_row) |
41 | | - |
42 | | - |
43 | | -@dataclasses.dataclass |
44 | | -class TabularDoc: |
45 | | - cards: typing.Iterable[tuple[str, dict]] |
46 | | - |
47 | | - @functools.cached_property |
48 | | - def field_paths(self) -> tuple[Jsonpath, ...]: |
49 | | - # TODO: use jsonapi's "sparse fieldsets" to allow selecting |
50 | | - # https://jsonapi.org/format/#fetching-sparse-fieldsets |
51 | | - return tuple(( |
52 | | - ('@id',), |
53 | | - *self._nonempty_field_paths() |
54 | | - )) |
55 | | - |
56 | | - def header(self) -> list[str]: |
57 | | - return ['.'.join(_path) for _path in self.field_paths] |
58 | | - |
59 | | - def rows(self) -> typing.Iterator[list[str]]: |
60 | | - for _card_iri, _osfmap_json in self.cards: |
61 | | - yield self._row_values(_osfmap_json) |
62 | | - |
63 | | - def _nonempty_field_paths(self) -> typing.Iterator[Jsonpath]: |
64 | | - for _path in osfmap.DEFAULT_TABULAR_SEARCH_COLUMN_PATHS: |
65 | | - _jsonpath = _osfmap_jsonpath(_path) |
66 | | - _path_is_present = any( |
67 | | - _has_value(_card, _jsonpath) |
68 | | - for (_, _card) in self.cards |
69 | | - ) |
70 | | - if _path_is_present: |
71 | | - yield _jsonpath |
72 | | - |
73 | | - def _row_values(self, osfmap_json: dict) -> list[str]: |
74 | | - return [ |
75 | | - self._row_field_value(osfmap_json, _field_path) |
76 | | - for _field_path in self.field_paths |
77 | | - ] |
78 | | - |
79 | | - def _row_field_value(self, osfmap_json: dict, field_path: Jsonpath) -> str: |
80 | | - return _MULTIVALUE_DELIMITER.join( |
81 | | - _render_tabularly(_obj) |
82 | | - for _obj in _iter_values(osfmap_json, field_path) |
83 | | - ) |
84 | | - |
85 | | - |
86 | | -def _osfmap_jsonpath(iri_path: typing.Iterable[str]) -> Jsonpath: |
87 | | - _shorthand = osfmap.osfmap_shorthand() |
88 | | - return tuple( |
89 | | - _shorthand.compact_iri(_pathstep) |
90 | | - for _pathstep in iri_path |
91 | | - ) |
92 | | - |
93 | | - |
94 | | -def _has_value(osfmap_json: dict, path: Jsonpath) -> bool: |
95 | | - try: |
96 | | - next(_iter_values(osfmap_json, path)) |
97 | | - except StopIteration: |
98 | | - return False |
99 | | - else: |
100 | | - return True |
101 | | - |
102 | | - |
103 | | -def _iter_values(osfmap_json: dict, path: Jsonpath) -> typing.Iterator: |
104 | | - assert path |
105 | | - (_step, *_rest) = path |
106 | | - _val = osfmap_json.get(_step) |
107 | | - if _rest: |
108 | | - if isinstance(_val, dict): |
109 | | - yield from _iter_values(_val, _rest) |
110 | | - elif isinstance(_val, list): |
111 | | - for _val_obj in _val: |
112 | | - yield from _iter_values(_val_obj, _rest) |
113 | | - else: |
114 | | - if isinstance(_val, list): |
115 | | - yield from _val |
116 | | - elif _val is not None: |
117 | | - yield _val |
118 | | - |
119 | | - |
120 | | -def _render_tabularly(json_val): |
121 | | - if isinstance(json_val, (str, int, float)): |
122 | | - return json_val |
123 | | - if isinstance(json_val, dict): |
124 | | - for _key in _VALUE_KEY_PREFERENCE: |
125 | | - _val = json_val.get(_key) |
126 | | - if isinstance(_val, list): |
127 | | - return ( |
128 | | - _render_tabularly(_val[0]) |
129 | | - if _val |
130 | | - else None |
131 | | - ) |
132 | | - if _val is not None: |
133 | | - return _val |
134 | | - return None |
135 | | - |
136 | | - |
137 | | -class _Echo: |
138 | | - '''a write-only file-like object, to convince `csv.csvwriter.writerow` to return strings |
139 | | -
|
140 | | - from https://docs.djangoproject.com/en/5.1/howto/outputting-csv/#streaming-large-csv-files |
141 | | - ''' |
142 | | - def write(self, line: str): |
143 | | - return line |
0 commit comments