Skip to content

Commit 61d1b0a

Browse files
committed
Add version dicrimination for cosmotech-api processing
1 parent 5d5503d commit 61d1b0a

File tree

9 files changed

+234
-126
lines changed

9 files changed

+234
-126
lines changed

cosmotech/coal/cosmotech_api/__init__.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616
write_parameters,
1717
)
1818

19-
# Re-export functions from the twin_data_layer module
20-
from cosmotech.coal.cosmotech_api.twin_data_layer import (
21-
get_dataset_id_from_runner,
22-
send_files_to_tdl,
23-
load_files_from_tdl,
24-
)
19+
from cosmotech.coal.utils.semver import semver_of
20+
csm_version = semver_of('cosmotech_api')
21+
if csm_version.major < 5:
22+
# Re-export functions from the twin_data_layer module
23+
from cosmotech.coal.cosmotech_api.twin_data_layer import (
24+
get_dataset_id_from_runner,
25+
send_files_to_tdl,
26+
load_files_from_tdl,
27+
)
2528

2629
# Re-export functions from the run_data module
2730
from cosmotech.coal.cosmotech_api.run_data import (

cosmotech/coal/cosmotech_api/dataset/download/file.py

Lines changed: 124 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,137 @@
1212
import tempfile
1313
import time
1414
from pathlib import Path
15-
from typing import Dict, List, Any, Optional, Union, Tuple
15+
from typing import Dict, Any, Optional, Union, Tuple
1616

1717
from cosmotech_api import WorkspaceApi
1818
from openpyxl import load_workbook
1919

20+
from cosmotech.coal.utils.decorator import timed
2021
from cosmotech.coal.utils.logger import LOGGER
2122
from cosmotech.orchestrator.utils.translate import T
2223
from cosmotech.coal.cosmotech_api.connection import get_api_client
2324

2425

26+
def process_xls(target_file) -> Dict[str, Any]:
27+
content = {}
28+
29+
LOGGER.debug(T("coal.services.dataset.processing_excel").format(file_name=target_file))
30+
wb = load_workbook(target_file, data_only=True)
31+
32+
for sheet_name in wb.sheetnames:
33+
sheet = wb[sheet_name]
34+
content[sheet_name] = list()
35+
headers = next(sheet.iter_rows(max_row=1, values_only=True))
36+
37+
row_count = 0
38+
for r in sheet.iter_rows(min_row=2, values_only=True):
39+
row = {k: v for k, v in zip(headers, r)}
40+
new_row = dict()
41+
42+
for key, value in row.items():
43+
try:
44+
converted_value = json.load(io.StringIO(value))
45+
except (json.decoder.JSONDecodeError, TypeError):
46+
converted_value = value
47+
48+
if converted_value is not None:
49+
new_row[key] = converted_value
50+
51+
if new_row:
52+
content[sheet_name].append(new_row)
53+
row_count += 1
54+
55+
LOGGER.debug(
56+
T("coal.services.dataset.sheet_processed").format(sheet_name=sheet_name, rows=row_count)
57+
)
58+
return content
59+
60+
61+
def process_csv(target_file) -> Dict[str, Any]:
62+
content = {}
63+
64+
LOGGER.debug(T("coal.services.dataset.processing_csv").format(file_name=target_file))
65+
with open(target_file, "r") as file:
66+
current_filename = os.path.basename(target_file)[: -len(".csv")]
67+
content[current_filename] = list()
68+
69+
row_count = 0
70+
for csv_row in csv.DictReader(file):
71+
csv_row: dict
72+
new_row = dict()
73+
74+
for key, value in csv_row.items():
75+
try:
76+
# Try to convert any json row to dict object
77+
converted_value = json.load(io.StringIO(value))
78+
except json.decoder.JSONDecodeError:
79+
converted_value = value
80+
81+
if converted_value == "":
82+
converted_value = None
83+
84+
if converted_value is not None:
85+
new_row[key] = converted_value
86+
87+
content[current_filename].append(new_row)
88+
row_count += 1
89+
90+
LOGGER.debug(
91+
T("coal.services.dataset.csv_processed").format(file_name=current_filename, rows=row_count)
92+
)
93+
return content
94+
95+
96+
def process_json(target_file) -> Dict[str, Any]:
97+
content = {}
98+
LOGGER.debug(T("coal.services.dataset.processing_json").format(file_name=target_file))
99+
with open(target_file, "r") as _file:
100+
current_filename = os.path.basename(target_file)
101+
content[current_filename] = json.load(_file)
102+
103+
if isinstance(content[current_filename], dict):
104+
item_count = len(content[current_filename])
105+
elif isinstance(content[current_filename], list):
106+
item_count = len(content[current_filename])
107+
else:
108+
item_count = 1
109+
110+
LOGGER.debug(
111+
T("coal.services.dataset.json_processed").format(file_name=current_filename, items=item_count)
112+
)
113+
return content
114+
115+
116+
def process_txt(target_file) -> Dict[str, Any]:
117+
content = {}
118+
LOGGER.debug(T("coal.services.dataset.processing_text").format(file_name=target_file))
119+
with open(target_file, "r") as _file:
120+
current_filename = os.path.basename(target_file)
121+
content[current_filename] = "\n".join(line for line in _file)
122+
123+
line_count = content[current_filename].count("\n") + 1
124+
LOGGER.debug(
125+
T("coal.services.dataset.text_processed").format(file_name=current_filename, lines=line_count)
126+
)
127+
return content
128+
129+
130+
def read_file(file_name, file):
131+
@timed(f"process{file_name}", debug=True)
132+
def timed_read_file(file_name, file):
133+
content = {}
134+
if ".xls" in file_name:
135+
content.update(process_xls(file))
136+
elif ".csv" in file_name:
137+
content.update(process_csv(file))
138+
elif ".json" in file_name:
139+
content.update(process_json(file))
140+
else:
141+
content.update(process_txt(file))
142+
return content
143+
return timed_read_file(file_name, file)
144+
145+
25146
def download_file_dataset(
26147
organization_id: str,
27148
workspace_id: str,
@@ -105,109 +226,8 @@ def download_file_dataset(
105226
)
106227
)
107228

108-
if not read_files:
109-
continue
110-
111-
# Process file based on type
112-
process_start = time.time()
113-
114-
if ".xls" in _file_name:
115-
LOGGER.debug(T("coal.services.dataset.processing_excel").format(file_name=target_file))
116-
wb = load_workbook(target_file, data_only=True)
117-
118-
for sheet_name in wb.sheetnames:
119-
sheet = wb[sheet_name]
120-
content[sheet_name] = list()
121-
headers = next(sheet.iter_rows(max_row=1, values_only=True))
122-
123-
def item(_row: tuple) -> dict:
124-
return {k: v for k, v in zip(headers, _row)}
125-
126-
row_count = 0
127-
for r in sheet.iter_rows(min_row=2, values_only=True):
128-
row = item(r)
129-
new_row = dict()
130-
131-
for key, value in row.items():
132-
try:
133-
converted_value = json.load(io.StringIO(value))
134-
except (json.decoder.JSONDecodeError, TypeError):
135-
converted_value = value
136-
137-
if converted_value is not None:
138-
new_row[key] = converted_value
139-
140-
if new_row:
141-
content[sheet_name].append(new_row)
142-
row_count += 1
143-
144-
LOGGER.debug(
145-
T("coal.services.dataset.sheet_processed").format(sheet_name=sheet_name, rows=row_count)
146-
)
147-
148-
elif ".csv" in _file_name:
149-
LOGGER.debug(T("coal.services.dataset.processing_csv").format(file_name=target_file))
150-
with open(target_file, "r") as file:
151-
current_filename = os.path.basename(target_file)[: -len(".csv")]
152-
content[current_filename] = list()
153-
154-
row_count = 0
155-
for csv_row in csv.DictReader(file):
156-
csv_row: dict
157-
new_row = dict()
158-
159-
for key, value in csv_row.items():
160-
try:
161-
# Try to convert any json row to dict object
162-
converted_value = json.load(io.StringIO(value))
163-
except json.decoder.JSONDecodeError:
164-
converted_value = value
165-
166-
if converted_value == "":
167-
converted_value = None
168-
169-
if converted_value is not None:
170-
new_row[key] = converted_value
171-
172-
content[current_filename].append(new_row)
173-
row_count += 1
174-
175-
LOGGER.debug(
176-
T("coal.services.dataset.csv_processed").format(file_name=current_filename, rows=row_count)
177-
)
178-
179-
elif ".json" in _file_name:
180-
LOGGER.debug(T("coal.services.dataset.processing_json").format(file_name=target_file))
181-
with open(target_file, "r") as _file:
182-
current_filename = os.path.basename(target_file)
183-
content[current_filename] = json.load(_file)
184-
185-
if isinstance(content[current_filename], dict):
186-
item_count = len(content[current_filename])
187-
elif isinstance(content[current_filename], list):
188-
item_count = len(content[current_filename])
189-
else:
190-
item_count = 1
191-
192-
LOGGER.debug(
193-
T("coal.services.dataset.json_processed").format(file_name=current_filename, items=item_count)
194-
)
195-
196-
else:
197-
LOGGER.debug(T("coal.services.dataset.processing_text").format(file_name=target_file))
198-
with open(target_file, "r") as _file:
199-
current_filename = os.path.basename(target_file)
200-
content[current_filename] = "\n".join(line for line in _file)
201-
202-
line_count = content[current_filename].count("\n") + 1
203-
LOGGER.debug(
204-
T("coal.services.dataset.text_processed").format(file_name=current_filename, lines=line_count)
205-
)
206-
207-
process_time = time.time() - process_start
208-
LOGGER.debug(
209-
T("coal.common.timing.operation_completed").format(operation=f"process {_file_name}", time=process_time)
210-
)
229+
if read_files:
230+
content.update(read_file(_file_name, target_file))
211231

212232
elapsed_time = time.time() - start_time
213233
LOGGER.info(T("coal.common.timing.operation_completed").format(operation="File download", time=elapsed_time))

cosmotech/coal/cosmotech_api/dataset/download/twingraph.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,9 @@
88
import time
99
import tempfile
1010
from pathlib import Path
11-
from typing import Dict, List, Any, Optional, Union, Tuple
11+
from typing import Dict, Any, Optional, Union, Tuple
1212

13-
from cosmotech_api import (
14-
DatasetApi,
15-
DatasetTwinGraphQuery,
16-
TwinGraphQuery,
17-
TwingraphApi,
18-
)
13+
import cosmotech_api
1914

2015
from cosmotech.coal.utils.logger import LOGGER
2116
from cosmotech.orchestrator.utils.translate import T
@@ -47,12 +42,12 @@ def download_twingraph_dataset(
4742
)
4843

4944
with get_api_client()[0] as api_client:
50-
dataset_api = DatasetApi(api_client)
45+
dataset_api = cosmotech_api.DatasetApi(api_client)
5146

5247
# Query nodes
5348
nodes_start = time.time()
5449
LOGGER.debug(T("coal.services.dataset.twingraph_querying_nodes").format(dataset_id=dataset_id))
55-
nodes_query = DatasetTwinGraphQuery(query="MATCH(n) RETURN n")
50+
nodes_query = cosmotech_api.DatasetTwinGraphQuery(query="MATCH(n) RETURN n")
5651

5752
nodes = dataset_api.twingraph_query(
5853
organization_id=organization_id,
@@ -67,7 +62,7 @@ def download_twingraph_dataset(
6762
# Query edges
6863
edges_start = time.time()
6964
LOGGER.debug(T("coal.services.dataset.twingraph_querying_edges").format(dataset_id=dataset_id))
70-
edges_query = DatasetTwinGraphQuery(query="MATCH(n)-[r]->(m) RETURN n as src, r as rel, m as dest")
65+
edges_query = cosmotech_api.DatasetTwinGraphQuery(query="MATCH(n)-[r]->(m) RETURN n as src, r as rel, m as dest")
7166

7267
edges = dataset_api.twingraph_query(
7368
organization_id=organization_id,
@@ -129,12 +124,12 @@ def download_legacy_twingraph_dataset(
129124
)
130125

131126
with get_api_client()[0] as api_client:
132-
api_instance = TwingraphApi(api_client)
127+
api_instance = cosmotech_api.TwingraphApi(api_client)
133128

134129
# Query nodes
135130
nodes_start = time.time()
136131
LOGGER.debug(T("coal.services.dataset.legacy_twingraph_querying_nodes").format(cache_name=cache_name))
137-
_query_nodes = TwinGraphQuery(query="MATCH(n) RETURN n")
132+
_query_nodes = cosmotech_api.TwinGraphQuery(query="MATCH(n) RETURN n")
138133

139134
nodes = api_instance.query(
140135
organization_id=organization_id,
@@ -149,7 +144,7 @@ def download_legacy_twingraph_dataset(
149144
# Query relationships
150145
rel_start = time.time()
151146
LOGGER.debug(T("coal.services.dataset.legacy_twingraph_querying_relations").format(cache_name=cache_name))
152-
_query_rel = TwinGraphQuery(query="MATCH(n)-[r]->(m) RETURN n as src, r as rel, m as dest")
147+
_query_rel = cosmotech_api.TwinGraphQuery(query="MATCH(n)-[r]->(m) RETURN n as src, r as rel, m as dest")
153148

154149
rel = api_instance.query(
155150
organization_id=organization_id,

0 commit comments

Comments
 (0)