-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdataset_helper_object.py
More file actions
266 lines (231 loc) · 14.1 KB
/
dataset_helper_object.py
File metadata and controls
266 lines (231 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import os
import re
from array import array
import requests
import logging
from flask import Flask
from neo4j.exceptions import Neo4jError
from hubmap_commons.hubmap_const import HubmapConst
from hubmap_sdk import EntitySdk, SearchSdk, sdk_helper
from pandas.core.array_algos.take import take_nd
# Suppress InsecureRequestWarning warning when requesting status on https with ssl cert verify disabled
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Set logging format and level (default is warning)
# All the API logging is forwarded to the uWSGI server and gets written into the log file `uwsgi-ingest-api.log`
# Log rotation is handled via logrotate on the host system with a configuration file
# Do NOT handle log file and rotation via the Python logging to avoid issues with multi-worker processes
logging.basicConfig(format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s', level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)
# In Python, "privacy" depends on "consenting adults'" levels of agreement, we can't force it.
# A single leading underscore means you're not supposed to access it "from the outside"
_entity_api_url = None
_search_api_url = None
_ontology_api_url = None
_globus_public_endpoint_filepath = _globus_consortium_endpoint_filepath = _globus_protected_endpoint_filepath = None
def load_flask_instance_config():
# Specify the absolute path of the instance folder and use the config file relative to the instance path
app = Flask(__name__,
instance_path=os.path.join(os.path.abspath(os.path.dirname(__file__)), 'instance'),
instance_relative_config=True)
app.config.from_pyfile('app.cfg')
return app.config
class DatasetHelper:
def __init__(self):
# Specify as module-scope variables
global _entity_api_url
global _search_api_url
global _ontology_api_url
global _globus_public_endpoint_filepath, _globus_consortium_endpoint_filepath, _globus_protected_endpoint_filepath
if _entity_api_url is None:
config = load_flask_instance_config()
_entity_api_url = config['ENTITY_WEBSERVICE_URL']
_search_api_url = config['SEARCH_WEBSERVICE_URL']
_ontology_api_url = config['UBKG_WEBSERVICE_URL']
_globus_public_endpoint_filepath = config['GLOBUS_PUBLIC_ENDPOINT_FILEPATH']
_globus_consortium_endpoint_filepath = config['GLOBUS_CONSORTIUM_ENDPOINT_FILEPATH']
_globus_protected_endpoint_filepath = config['GLOBUS_PROTECTED_ENDPOINT_FILEPATH']
def get_organ_types_dict(self) -> object:
organ_types_url = _ontology_api_url + 'organs/by-code?application_context=HUBMAP'
organ_resource_file = requests.get(organ_types_url).json()
return organ_resource_file
# This is the business logic for `/datasets/<uuid>/verifytitleinfo` endpoint that is used by
# the ingest-validation-tests package to validate the data needed to produce a title
# Note: the `title` is generated by entity-api using a on_read_trigger
# here is one uuid that actually pass validation requirements: ead5cc01250b4f9ea73dd91503c313a5
def verify_dataset_title_info(self, dataset_uuid: str, user_token: str) -> array:
entity_api = EntitySdk(token=user_token, service_url=_entity_api_url)
search_api = SearchSdk(token=user_token, service_url=_search_api_url)
data_found = {'age': False, 'race': False, 'sex': False}
rslt = []
try:
entity = entity_api.get_entity_by_id(dataset_uuid)
except Exception:
rslt.append(f'Unable to get the target dataset with uuid: {dataset_uuid}')
return rslt
dataset = vars(entity)
if not 'dataset_type' in dataset:
rslt.append('The dataset did not contain a ''dataset_type'' key')
# TO-DO: the blow logic can be simplified by parsing the `title` field returned by entity-api - Zhou
try:
entity = entity_api.get_ancestors(dataset['uuid'])
except Exception:
rslt.append(f"Unable to get the ancestors of dataset with uuid: {dataset_uuid}")
for ancestor in entity:
ancestor_dict = vars(ancestor)
if 'entity_type' in ancestor_dict:
if ancestor_dict['entity_type'] == 'Sample':
if 'sample_category' in ancestor_dict and ancestor_dict['sample_category'].lower() == 'organ':
if 'organ' in ancestor_dict:
organ_code = ancestor_dict['organ']
organ_types_dict = self.get_organ_types_dict()
if organ_code not in organ_types_dict:
rslt.append(f"Organ code '{organ_code}' not found in organ types file")
else:
rslt.append('Organ key not found in sample_category organ')
elif ancestor_dict['entity_type'] == 'Donor':
try:
for data in ancestor_dict['metadata']['organ_donor_data']:
if data['grouping_concept_preferred_term'].lower() == 'age':
data_found['age'] = True
if data['grouping_concept_preferred_term'].lower() == 'race':
data_found['race'] = True
if data['grouping_concept_preferred_term'].lower() == 'sex':
data_found['sex'] = True
except KeyError:
pass
for k, v in data_found.items():
if not v:
rslt.append(f'Donor metadata.organ_donor_data grouping_concept_preferred_term {k} not found')
return rslt
# neo4j_driver - The driver instance for neo4j
# json_payload - A list of ids (HM_ID or UUID)
# user_data_access_level - Data access level information for the user, notably including
# Globus Group membership information.
#
# Returns a Dict of Dicts where each of the dicts inside is keyed by its original id given
# in the json_payload and contains information about the accessibility of that directory
# including its globus url.
def get_entity_accessibility(self, neo4j_driver, json_payload, user_data_access_level) -> dict:
supported_entity_type_list = ['Dataset', 'Upload']
accessibility_dicts = {}
query = (
"MATCH (e:Entity) "
"WHERE e.uuid IN $ids OR e.hubmap_id IN $ids "
"RETURN COLLECT({"
"uuid: e.uuid, "
"hubmap_id: e.hubmap_id, "
"entity_type: e.entity_type, "
"status: e.status, "
"group_name: e.group_name, "
"group_uuid: e.group_uuid, "
"contains_human_genetic_sequences: e.contains_human_genetic_sequences, "
"data_access_level: e.data_access_level"
"}) AS entities"
)
with neo4j_driver.session() as session:
result = session.run(query, ids=json_payload)
record = result.single()
entities = record["entities"] if record else []
requested_ids = set(json_payload)
matched_ids = set()
for e in entities:
if e.get("uuid"):
matched_ids.add(e["uuid"])
if e.get("hubmap_id"):
matched_ids.add(e["hubmap_id"])
if e.get("uuid") in requested_ids:
e['original_id'] = e.get("uuid")
else:
e['original_id'] = e.get("hubmap_id")
invalid_ids = list(requested_ids - matched_ids)
for invalid in invalid_ids:
accessibility_dicts[invalid] = {'valid_id': False}
for entity_dict in entities:
if entity_dict['entity_type'] not in supported_entity_type_list:
accessibility_dicts[entity_dict['original_id']] = {'valid_id': False}
# Make sure all expected elements for the business requirements are in the returned entity.
# Need to determine entity "visibility" using the same rules found in the
missing_entity_elements = []
if 'entity_type' not in entity_dict:
missing_entity_elements.append('entity_type')
if 'uuid' not in entity_dict:
missing_entity_elements.append('uuid')
if 'hubmap_id' not in entity_dict:
missing_entity_elements.append('hubmap_id')
if 'status' not in entity_dict:
missing_entity_elements.append('status')
if 'group_name' not in entity_dict:
missing_entity_elements.append('group_name')
if 'group_uuid' not in entity_dict:
missing_entity_elements.append('group_uuid')
if 'contains_human_genetic_sequences' not in entity_dict and \
entity_dict['entity_type'] == 'Dataset':
missing_entity_elements.append('contains_human_genetic_sequences')
if 'data_access_level' not in entity_dict and \
entity_dict['entity_type'] == 'Dataset':
missing_entity_elements.append('data_access_level')
if missing_entity_elements:
logger.error(f"Unexpected format for '{entity_dict['original_id']}'"
f" , missing {str(missing_entity_elements)}"
f" from entity={str(entity_dict)}.")
raise Exception(f"Data error determining accessibility of '{entity_dict['origina_id']}'")
if entity_dict['entity_type'] == 'Dataset':
user_access_allowed = (entity_dict['data_access_level'] == HubmapConst.ACCESS_LEVEL_PUBLIC)
if not user_access_allowed:
user_access_allowed = (entity_dict['data_access_level'] == HubmapConst.ACCESS_LEVEL_CONSORTIUM) and \
user_data_access_level['data_access_level'] in [
HubmapConst.ACCESS_LEVEL_CONSORTIUM \
, HubmapConst.ACCESS_LEVEL_PROTECTED]
if not user_access_allowed:
user_access_allowed = (entity_dict['data_access_level'] == HubmapConst.ACCESS_LEVEL_PROTECTED) and \
(user_data_access_level['data_access_level'] in [
HubmapConst.ACCESS_LEVEL_PROTECTED] \
or entity_dict['group_uuid'] in user_data_access_level['group_membership_ids'])
if (entity_dict['data_access_level'] == HubmapConst.ACCESS_LEVEL_PROTECTED) and not user_access_allowed and entity_dict.get('status').lower() == 'published':
abs_path = os.path.join(_globus_public_endpoint_filepath
, entity_dict['uuid'])
elif (entity_dict['data_access_level'] == HubmapConst.ACCESS_LEVEL_PROTECTED):
abs_path = os.path.join(_globus_protected_endpoint_filepath
, entity_dict['group_name']
, entity_dict['uuid'])
elif entity_dict['data_access_level'] == HubmapConst.ACCESS_LEVEL_CONSORTIUM:
abs_path = os.path.join(_globus_consortium_endpoint_filepath
, entity_dict['group_name']
, entity_dict['uuid'])
elif entity_dict['data_access_level'] == HubmapConst.ACCESS_LEVEL_PUBLIC:
abs_path = os.path.join(_globus_public_endpoint_filepath
, entity_dict['uuid'])
else:
raise Exception(f"Unexpected error for {entity_dict['original_id']} of type"
f" {entity_dict['entity_type']} with data access level"
f" {entity_dict['data_access_level']}.")
entity_accessibility_dict = {'valid_id': True, 'access_allowed': user_access_allowed}
if entity_dict.get('status').lower() == 'published':
entity_accessibility_dict['access_allowed'] = True
if user_access_allowed or entity_dict.get('status').lower() == 'published':
entity_accessibility_dict['hubmap_id'] = entity_dict['hubmap_id']
entity_accessibility_dict['uuid'] = entity_dict['uuid']
entity_accessibility_dict['entity_type'] = entity_dict['entity_type']
entity_accessibility_dict['file_system_path'] = abs_path
accessibility_dicts[entity_dict['original_id']] = entity_accessibility_dict
elif entity_dict['entity_type'] == 'Upload':
user_access_allowed = (user_data_access_level['data_access_level'] in [
HubmapConst.ACCESS_LEVEL_PROTECTED]
or entity_dict['group_uuid'] in user_data_access_level['group_membership_ids'])
abs_path = os.path.join(_globus_protected_endpoint_filepath
, entity_dict['group_name']
, entity_dict['uuid'])
entity_accessibility_dict = { 'valid_id': True
, 'access_allowed': user_access_allowed}
if user_access_allowed:
entity_accessibility_dict['hubmap_id'] = entity_dict['hubmap_id']
entity_accessibility_dict['uuid'] = entity_dict['uuid']
entity_accessibility_dict['entity_type'] = entity_dict['entity_type']
entity_accessibility_dict['file_system_path'] = abs_path
accessibility_dicts[entity_dict['original_id']] = entity_accessibility_dict
else:
raise Exception(f"Unexpected error for {entity_dict['original_id']} of type"
f" {entity_dict['entity_type']}.")
return accessibility_dicts