-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathAgentSyncer.py
More file actions
78 lines (63 loc) · 2.94 KB
/
AgentSyncer.py
File metadata and controls
78 lines (63 loc) · 2.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from EMInfraImporter import EMInfraImporter
from EventProcessors.NieuwAssetProcessor import NieuwAssetProcessor
from Neo4JConnector import Neo4JConnector
class AgentSyncer:
def __init__(self, neo4J_connector: Neo4JConnector, emInfraImporter: EMInfraImporter):
self.neo4J_connector = neo4J_connector
self.emInfraImporter = emInfraImporter
self.tx_context = None
def sync_agents(self):
self.tx_context = self.neo4J_connector.start_transaction()
self.update_all_agents()
self.neo4J_connector.commit_transaction(self.tx_context)
def update_all_agents(self):
agents = self.get_all_agents()
self.update_agents(agent_dicts=agents)
def get_all_agents(self) -> []:
return self.emInfraImporter.import_all_agents_from_webservice()
def update_agents(self, agent_dicts: [dict], chunk_size: int = 20):
if len(agent_dicts) == 0:
return
flattened_dicts = self.clean_agent_dicts(agent_dicts)
list_id_uris = list(map(lambda x: x['assetIdUri'], flattened_dicts))
existing_nodes = self.tx_context.run("MATCH (a:Agent) WHERE a.assetIdUri IN $params RETURN a", params=list_id_uris).data()
existing_id_uris = []
if len(existing_nodes) > 0:
l = list(map(lambda x: x['a'], existing_nodes))
existing_id_uris = list(map(lambda x: x['assetIdUri'], l))
# filter which must be created and what must be updated
dicts_to_create = []
dicts_to_update = []
for agent in flattened_dicts:
if agent['assetIdUri'] in existing_id_uris:
dicts_to_update.append(agent)
else:
dicts_to_create.append(agent)
# create agents
for i in range(0, len(dicts_to_create), chunk_size):
chunk = dicts_to_create[i:i + chunk_size]
self.tx_context.run("UNWIND $params AS map CREATE (a:Agent) SET a = map", params=chunk)
# update agents
for i in range(0, len(dicts_to_update), chunk_size):
chunk = dicts_to_update[i:i + chunk_size]
self.tx_context.run("UNWIND $params AS map MATCH (a:Agent) WHERE a.assetIdUri = map.assetIdUri SET a = map", params=chunk)
@staticmethod
def clean_agent_dicts(agent_dicts):
flattened_dicts = []
for agent_dict in agent_dicts:
old_dict = NieuwAssetProcessor().flatten_dict(input_dict=agent_dict)
new_dict = {}
for k, v in old_dict.items():
if k == '@type':
new_dict[k] = v
continue
if k == '@id':
new_dict['assetIdUri'] = v
new_dict['uuid'] = v.split('/')[-1][:36]
continue
if ':' in k:
new_dict[k.split(':')[-1]] = v
else:
new_dict[k] = v
flattened_dicts.append(new_dict)
return flattened_dicts