-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathEMInfraImporter.py
More file actions
123 lines (98 loc) · 5.79 KB
/
EMInfraImporter.py
File metadata and controls
123 lines (98 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import base64
import json
from AbstractRequester import AbstractRequester
class EMInfraImporter:
def __init__(self, requester: AbstractRequester):
self.requester = requester
self.requester.first_part_url += 'eminfra/'
self.cursor = ''
def get_events_from_page(self, page_num: int, page_size: int = 1):
url = f"feedproxy/feed/assets/{page_num}/{page_size}"
response = self.requester.get(url)
decoded_string = response.content.decode()
return json.loads(decoded_string)
def get_objects_from_oslo_search_endpoint(self, url_part: str, filter_string: str = '{}', size: int = 100,
only_next_page: bool = False) -> [dict]:
url = f"core/api/otl/{url_part}/search"
body_fixed_part = '{"size": ' + f'{size}'
if filter_string != '{}':
body_fixed_part += f', "filters": {filter_string}'
json_list = []
while True:
body = body_fixed_part
if self.cursor != '':
body += f', "fromCursor": "{self.cursor}"'
body += '}'
json_data = json.loads(body)
response = self.requester.post(url=url, json=json_data)
decoded_string = response.content.decode("utf-8")
dict_obj = json.loads(decoded_string)
keys = response.headers.keys()
json_list.extend(dict_obj['@graph'])
if 'em-paging-next-cursor' in keys:
self.cursor = response.headers['em-paging-next-cursor']
else:
self.cursor = ''
if only_next_page:
return json_list
if self.cursor == '':
return json_list
def get_assets_from_webservice_by_naam(self, naam: str) -> [dict]:
filter_string = '{ "naam": ' + f'"{naam}"' + ' }'
return self.get_objects_from_oslo_search_endpoint(url_part='assets', filter_string=filter_string)
def import_all_assets_from_webservice(self) -> [dict]:
return self.get_objects_from_oslo_search_endpoint(url_part='assets')
def import_assets_from_webservice_page_by_page(self, page_size: int) -> [dict]:
return self.get_objects_from_oslo_search_endpoint(url_part='assets', size=page_size, only_next_page=True)
def import_assets_from_webservice_by_uuids(self, asset_uuids: [str]) -> [dict]:
return self._extracted_from_import_agents_from_webservice_by_uuids_2(asset_uuids, 'assets')
def import_all_agents_from_webservice(self) -> [dict]:
return self.get_objects_from_oslo_search_endpoint(url_part='agents')
def import_agents_from_webservice_page_by_page(self, page_size: int) -> [dict]:
return self.get_objects_from_oslo_search_endpoint(url_part='agents', size=page_size, only_next_page=True)
def import_agents_from_webservice_by_uuids(self, agent_uuids: [str]) -> [dict]:
return self._extracted_from_import_agents_from_webservice_by_uuids_2(
agent_uuids, 'agents'
)
# TODO Rename this here and in `import_assets_from_webservice_by_uuids` and `import_agents_from_webservice_by_uuids`
def _extracted_from_import_agents_from_webservice_by_uuids_2(self, arg0, url_part):
asset_list_string = '", "'.join(arg0)
filter_string = '{ "uuid": ' + f'["{asset_list_string}"]' + ' }'
return self.get_objects_from_oslo_search_endpoint(
url_part=url_part, filter_string=filter_string
)
def import_all_assetrelaties_from_webservice(self) -> [dict]:
return self.get_distinct_set_from_list_of_relations(
self.get_objects_from_oslo_search_endpoint(url_part='assetrelaties'))
def import_assetrelaties_from_webservice_page_by_page(self, page_size: int) -> [dict]:
return self.get_distinct_set_from_list_of_relations(
self.get_objects_from_oslo_search_endpoint(url_part='assetrelaties', size=page_size, only_next_page=True))
def import_assetrelaties_from_webservice_by_assetuuids(self, asset_uuids: [str]) -> [dict]:
asset_list_string = '", "'.join(asset_uuids)
filter_string = '{ "asset": ' + f'["{asset_list_string}"]' + ' }'
return self.get_distinct_set_from_list_of_relations(
self.get_objects_from_oslo_search_endpoint(url_part='assetrelaties', filter_string=filter_string))
def import_all_betrokkenerelaties_from_webservice(self) -> [dict]:
return self.get_distinct_set_from_list_of_relations(
self.get_objects_from_oslo_search_endpoint(url_part='betrokkenerelaties'))
def import_betrokkenerelaties_from_webservice_page_by_page(self, page_size: int) -> [dict]:
return self.get_distinct_set_from_list_of_relations(
self.get_objects_from_oslo_search_endpoint(url_part='betrokkenerelaties', size=page_size, only_next_page=True))
def import_betrokkenerelaties_from_webservice_by_assetuuids(self, asset_uuids: [str]) -> [dict]:
asset_list_string = '", "'.join(asset_uuids)
filter_string = '{ "bronAsset": ' + f'["{asset_list_string}"]' + ' }'
return self.get_distinct_set_from_list_of_relations(
self.get_objects_from_oslo_search_endpoint(url_part='betrokkenerelaties', filter_string=filter_string))
@staticmethod
def get_asset_id_from_uuid_and_typeURI(uuid, typeURI):
shortUri = typeURI.split('/ns/')[1]
if 'lgc' in typeURI:
shortUri = f'lgc:{shortUri}'
shortUri_encoded = base64.b64encode(shortUri.encode('utf-8'))
shortUri_encoded = shortUri_encoded.decode("utf-8")
while shortUri_encoded[-1] == '=':
shortUri_encoded = shortUri_encoded[:-1]
return f'{uuid}-{shortUri_encoded}'
@staticmethod
def get_distinct_set_from_list_of_relations(relation_list: [dict]) -> [dict]:
return list({x["@id"]: x for x in relation_list}.values())