-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathvalidation.py
More file actions
308 lines (268 loc) · 14.4 KB
/
validation.py
File metadata and controls
308 lines (268 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import re
import logging
import unicodedata
import urllib.request
import urllib.error
from pathlib import Path, PurePath
from typing import Optional
from openMINDS_validation.utils import VocabManager, Versions, load_json, get_latest_version_commit, version_key, \
find_openminds_class, clone_central, expand_jsonld, fetch_remote_schema_extends
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
class SchemaTemplateValidator(object):
def __init__(self, absolute_path, repository=None, branch=None):
self.absolute_path = absolute_path
self.schema = load_json(absolute_path)
self.repository = repository
self.branch = branch
self.openMINDS_build_version = None
self.version_file = Versions("./versions.json").versions
def check_attype(self):
"""
Validates the format of the _type in the schema definition:
- First character of _type is in uppercase.
"""
if '_type' in self.schema:
type_schema_name = re.split("[:/]", self.schema['_type'])[-1]
if not type_schema_name[0].isupper():
logging.error(f'First character of _type "{type_schema_name}" should be uppercase.')
def check_extends(self):
"""
Validates the existence of schema for the _extends property.
"""
if '_extends' not in self.schema:
return
location = 'remote' if self.schema['_extends'].startswith("/") else './schemas/'
# _extends located in other repository
if location == 'remote':
# Needs optimization: finding the corresponding openMINDS version and module
versions_spec = sorted(list(self.version_file.keys()), key=version_key)
module_name_extends = self.schema['_extends'].split('/')[1]
extends_url = None
for version_number in versions_spec:
m = self.version_file[version_number]["modules"]
self.openMINDS_build_version = version_number
matched_extends_submodule = any(
submodule.get("repository") == self.repository and
submodule.get("branch") == self.branch
for submodule in m.values()
)
module = m.get(module_name_extends) or m.get(module_name_extends.upper())
if matched_extends_submodule:
extends_url = f"https://api.github.com/repos/openMetadataInitiative/{Path(module['repository']).stem}/contents/{'/'.join(self.schema['_extends'].split('/')[2:])}?ref={module['commit']}"
break
# By default, '_extends' is compared against 'latest'
elif version_number == 'latest':
commit = get_latest_version_commit(module)
extends_url = f'https://api.github.com/repos/openMetadataInitiative/{Path(module["repository"]).stem}/contents/{"/".join(self.schema["_extends"].split("/")[2:])}?ref={commit}'
break
try:
urllib.request.urlopen(extends_url)
except urllib.error.HTTPError:
logging.error(f'Schema not found for the property _extends "{self.schema["_extends"]}".')
# _extends located in same repository
else:
# Checks for openMINDS_actions/schemas/_extends
path_extends_schema = f"{location}{self.schema['_extends']}"
try:
open(path_extends_schema, 'r')
except FileNotFoundError:
logging.error(f'Schema not found for the property _extends at "{self.schema["_extends"]}".')
def check_required(self):
"""
Validates required properties against the properties defined in the schema definition.
"""
def _check_required_extends(extends_value, required_property):
# Local case
if not extends_value.startswith("/") :
path_extends_schema = './schemas/' + extends_value
inherited_schema = load_json(path_extends_schema)
# Remote case
else:
inherited_schema = fetch_remote_schema_extends(extends_value, self.version_file, self.openMINDS_build_version)
if required_property not in inherited_schema['properties']:
if '_extends' in inherited_schema:
return _check_required_extends(inherited_schema['_extends'], required_property)
logging.error(f'Missing required property "{required_property}" in the schema definition.')
return
if 'required' not in self.schema:
return
for required_property in self.schema['required']:
if required_property not in self.schema['properties'].keys():
if '_extends' in self.schema:
_check_required_extends(self.schema['_extends'], required_property)
continue
logging.error(f'Missing required property "{required_property}" in the schema definition.')
def validate(self):
"""
Runs all the tests defined in SchemaTemplateValidator.
"""
self.check_attype()
self.check_extends()
self.check_required()
class InstanceValidator(object):
def __init__(self, absolute_path):
self.absolute_path = absolute_path
self._tuple_path = PurePath(absolute_path).parts
self.version = self._tuple_path[1]
self.subfolder = self._tuple_path[2] if self._tuple_path[2] != 'terminologies' else self._tuple_path[3]
self.file_name = Path(absolute_path).stem
self.namespaces = Versions("./versions.json").versions[self.version]['namespaces']
self.vocab = VocabManager("./types.json", "./properties.json")
self.instance = load_json(absolute_path)
self._id_instance_name = self.instance['@id'].split('/')[-1]
self._type_schema_name = None
self._id_schema_name = None
def _generate_expected_atid_name(name: str) -> Optional[str]:
sanitized = (
name
.translate(str.maketrans("", " ", "(),'\""))
.replace(".", "Dot ")
.replace("/", "_")
.replace("&", "And")
)
normalized = unicodedata.normalize("NFKD", sanitized)
normalized = normalized.encode("ascii", "ignore").decode("ascii")
words = normalized.split()
if not words:
return None
first = words[0] if words[0].isupper() or (
all(w[:1].isupper() for w in words if w[0].isalpha() and w.lower() not in {"and", "by"})
) else words[0].lower()
return first + ''.join(w if w.isupper() else w.title() for w in words[1:])
def _nested_instance(self, value, function, instance_type):
if isinstance(value, dict):
function(value, instance_type)
elif isinstance(value, list):
for item in value:
self._nested_instance(item, function, instance_type)
def check_atid_convention(self):
"""
Validates against:
- White space in @id and embedded @id.
- Differences between file name and @id.
- @id naming convention, using abbreviation if present, otherwise name.
"""
def _check_instance_id_convention(instance):
if instance is not None and '@id' in instance:
if ' ' in instance['@id']:
logging.error(f'White space detected for @id: "{instance["@id"]}".')
if instance['@id'].count('/') != 5:
logging.error(f'Unexpected number of "/" for @id: "{instance["@id"]}".')
# Differences between file name and @id
if self._id_instance_name != self.file_name:
logging.error(f'Mismatch between @id entity "{self._id_instance_name}" and file name "{self.file_name}".')
_check_instance_id_convention(self.instance)
# TODO add convention for the Accessibility library
# @id naming convention
name = self.instance.get('abbreviation') or self.instance.get('name')
if not name:
logging.warning('Property abbreviation/name is missing.')
else:
expected = self._generate_expected_atid_name(name)
if expected is None:
logging.warning('Property name is empty.')
elif self._id_instance_name != expected:
logging.warning(f'Unexpected @id entity: "{self._id_instance_name}" '
f'(full @id: {self.instance["@id"]}), (expected: {expected}).')
# White space in @id and embedded @id
for property in self.instance:
if self.instance[property] is not None and type(self.instance[property]) is dict and '@id' in self.instance[property]:
_check_instance_id_convention(self.instance[property])
if type(self.instance[property]) is list and len(self.instance[property]) > 0:
for instance_element in self.instance[property]:
_check_instance_id_convention(instance_element)
def check_missmatch_id_type(self):
"""
Validates against:
- missing @id/@type.
- @type not found in vocab for the given version.
- namespace of @type.
- mismatch of @type and @id.
"""
if self._type_schema_name not in self.vocab.vocab_types or self.version not in self.vocab.vocab_types[self._type_schema_name]['isPartOfVersion']:
logging.error(f'@type "{self._type_schema_name}" not found for "{self.version}" version.')
for _type_namespace_version in self.vocab.vocab_types[self._type_schema_name]['hasNamespace']:
if self.version in _type_namespace_version['inVersions']:
expected_type = _type_namespace_version['namespace'] + self._type_schema_name
if expected_type != self.instance['@type']:
logging.error(f'Unexpected namespace for @type: "{self.instance["@type"]}".')
break
if self._id_schema_name in ['licenses', 'contentTypes']:
# self._type_schema_name is not using plural
expected_type_name = self._id_schema_name[0].upper() + self._id_schema_name[1:-1]
else:
expected_type_name = self._id_schema_name[0].upper() + self._id_schema_name[1:]
if expected_type_name != self._type_schema_name:
logging.error(f'Mismatch between @id schema name "{self._id_schema_name}" and @type schema name "{self._type_schema_name}".')
def check_property_existence(self, instance=None, instance_type=None):
"""
Validates instance properties against the vocabulary for the given version and type.
"""
instance = instance if instance is not None else self.instance
instance_type = instance_type if instance_type is not None else instance.get('@type')
# Skip validation if no type is defined
if not instance_type:
return
for property in instance:
if property in ('@context', '@id', '@type'):
continue
elif property not in self.vocab.vocab_properties:
logging.error(f'Unknown property "{property}".')
continue
elif instance['@type'] not in self.vocab.vocab_properties[property]["usedIn"][self.version]:
logging.error(f'Property "{property}" not available for type "{instance_type}" in version "{self.version}".')
continue
self._nested_instance(instance[property], self.check_property_existence, instance_type)
def check_property_constraint(self, instance=None, instance_type=None, openminds_class=None):
"""
Validates the presence and values of required and optional properties in the instance.
"""
instance = instance if instance is not None else self.instance
if '@type' in instance:
instance_type = instance.get('@type').split('/')[-1]
# Skip validation if no @type
else:
return
openminds_class = find_openminds_class(self.version, instance_type)
openminds_class_properties = openminds_class["properties"].keys() if 'properties' in openminds_class else None
required_properties = openminds_class["required"] if 'required' in openminds_class else None
optional_properties = list(set(openminds_class_properties) - set(required_properties))
if "@context" in instance:
instance = expand_jsonld(instance)
for required_property in required_properties:
if required_property not in instance:
logging.error(f'Missing required property "{required_property}".')
elif required_property in instance and instance[required_property] in (None, '', ' '):
logging.error(f'Required property "{required_property}" is not defined.')
if required_property in instance:
self._nested_instance(instance[required_property], self.check_property_constraint, instance_type)
for optional_property in optional_properties:
if optional_property not in instance:
logging.error(f'Missing optional property "{optional_property}".')
elif optional_property in instance and instance[optional_property] in ('', ' '):
logging.warning(f'Unexpected value "{instance[optional_property]}" for "{optional_property}".')
if optional_property in instance:
self._nested_instance(instance[optional_property], self.check_property_constraint, instance_type)
def check_minimal_jsonld_structure(self):
"""
Check if @id and @type are present in the instance.
"""
# TODO Check @id in lists or instances when needed
# Schemas will need to be used to ensure that the constraints are correctly applied
if not all(key in self.instance for key in ('@id', '@type')):
logging.error("Instance must contain both @id and @type.")
self._type_schema_name = self.instance['@type'].split('/')[-1]
self._id_schema_name = self.instance['@id'].split('/')[-2]
def validate(self):
"""
Run all the tests defined in InstanceValidator.
"""
clone_central()
self.check_minimal_jsonld_structure()
self.check_atid_convention()
self.check_missmatch_id_type()
self.check_property_existence()
self.check_property_constraint()