-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathNeo4JConnector.py
More file actions
86 lines (70 loc) · 3.51 KB
/
Neo4JConnector.py
File metadata and controls
86 lines (70 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import logging
import time
from neo4j import GraphDatabase, Transaction
class Neo4JConnector:
def __init__(self, uri, user, password, database: str = 'neo4j'):
self.driver = GraphDatabase.driver(uri=uri, auth=(user, password))
self.db = database
def get_page_by_get_or_create_params(self):
with self.driver.session(database=self.db) as session:
params = session.run("MATCH (p:Params) RETURN p").single()
if params is None:
self.set_default_constraints_and_indices(session)
params = session.run("CREATE (p:Params {page:-1, event_id:'', pagesize:100, "
"freshstart:True, otltype:-1, cursor:''}) RETURN p").single()
return params[0]
@staticmethod
def save_props_to_params(tx: Transaction, params: dict):
tx.run("MATCH (p:Params) SET p += $params", params=params)
@staticmethod
def update_params(tx: Transaction, page_num: int, event_id: int):
tx.run(f"MATCH (p:Params) SET p.page = {page_num}, p.event_id = '{event_id}'")
def close(self):
self.driver.close()
def perform_create_asset(self, params: dict, ns: str, assettype: str):
with self.driver.session(database=self.db) as session:
tx = session.begin_transaction()
self._create_asset_by_dict(tx, params, ns, assettype)
tx.commit()
tx.close()
def perform_create_relatie(self, bron_uuid='', doel_uuid='', relatie_type='', params=None):
with self.driver.session(database=self.db) as session:
session.write_transaction(self._create_relatie_by_dict, bron_uuid=bron_uuid, doel_uuid=doel_uuid,
relatie_type=relatie_type, params=params)
@staticmethod
def _create_asset_by_dict(tx, params: dict, ns: str, assettype: str):
return tx.run(f"CREATE (a:Asset:{ns}:{assettype} $params) ", params=params)
@staticmethod
def _create_relatie_by_dict(tx, bron_uuid='', doel_uuid='', relatie_type='', params=None):
query = "MATCH (a:Asset), (b:Asset) " \
f"WHERE a.uuid = '{bron_uuid}' " \
f"AND b.uuid = '{doel_uuid}' " \
f"CREATE (a)-[r:{relatie_type} " \
"$params]->(b) " \
f"RETURN type(r), r.name"
return tx.run(query, params=params)
def start_transaction(self) -> Transaction:
return self.driver.session(database=self.db).begin_transaction()
@staticmethod
def commit_transaction(tx_context: Transaction):
tx_context.commit()
tx_context.close()
@staticmethod
def set_default_constraints_and_indices(session: Transaction):
session.run("CREATE CONSTRAINT Asset_uuid IF NOT EXISTS FOR (n:Asset) REQUIRE n.uuid IS UNIQUE")
session.run("CREATE CONSTRAINT Agent_uuid IF NOT EXISTS FOR (n:Agent) REQUIRE n.uuid IS UNIQUE")
def query(self, query):
assert self.driver is not None, "Driver not initialized!"
session = None
while True:
try:
session = self.driver.session(database=self.db)
return list(session.run(query))
except Exception as e:
logging.error("Query failed:", e)
logging.error('Are settings and/or connection to the Neo4J database okay?')
logging.info('Retrying in 30 seconds.')
time.sleep(30)
finally:
if session is not None:
session.close()