Skip to content

Commit 8aaa68c

Browse files
Add migration utils (#209)
1 parent 6f57fd6 commit 8aaa68c

File tree

7 files changed

+307
-6
lines changed

7 files changed

+307
-6
lines changed

.github/workflows/docker_publish.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
prefix: "v"
3838

3939
- run: echo "MAGE is at version ${{ steps.get-latest-tag.outputs.tag }}"
40-
40+
4141
- name: Get commit tag
4242
id: get-commit-tag
4343
run: |
@@ -58,7 +58,6 @@ jobs:
5858
echo "::set-output name=LATEST::fix"
5959
fi
6060
- run: echo "Additional tag for production image - ${{ steps.get-prod-tags.outputs.LATEST }}"
61-
6261
- name: Log in to Docker Hub
6362
uses: docker/login-action@v1
6463
with:

.github/workflows/test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ on: [pull_request, workflow_dispatch]
99

1010
jobs:
1111
build:
12-
runs-on: ubuntu-latest
12+
runs-on: ubuntu-latest
1313
env:
1414
MEMGRAPH_VERSION: 2.8.0
1515
strategy:
@@ -26,7 +26,7 @@ jobs:
2626

2727
- name: Set up QEMU
2828
uses: docker/setup-qemu-action@v2
29-
29+
3030
- name: Set up Docker Buildx
3131
id: buildx
3232
uses: docker/setup-buildx-action@v2
@@ -68,7 +68,7 @@ jobs:
6868
uses: actions/setup-python@v2
6969
with:
7070
python-version: ${{ env.PY_VERSION }}
71-
71+
7272
- name: Install Python test dependencies
7373
run: |
7474
python -m pip install --upgrade pip

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ RUN apt-get update && apt-get install -y \
2626
python3-dev `mage-memgraph` \
2727
clang `mage-memgraph` \
2828
git `mage-memgraph` \
29+
unixodbc `mage-memgraph` \
2930
libboost-all-dev `mage-memgraph` \
3031
--no-install-recommends \
3132
# Download and install Memgraph

Dockerfile.release

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ RUN apt-get update && apt-get install -y \
2424
python3-dev `mage-memgraph` \
2525
clang `mage-memgraph` \
2626
git `mage-memgraph` \
27+
unixodbc-dev `mage-memgraph` \
2728
--no-install-recommends \
2829
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
2930

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ To learn more about development with MAGE and Docker, visit the
197197
- python3-pip
198198
- python3-setuptools
199199
- python3-dev
200-
- clang
200+
- clang
201+
- unixodbc
201202

202203
Since Memgraph needs to load MAGE's modules, there is the `setup` script to help you. With it, you can build the modules so that Memgraph
203204
can load them on start up.

python/migrate.py

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
import json
2+
import mgp
3+
import mysql.connector as mysql_connector
4+
import oracledb
5+
import pyodbc
6+
import threading
7+
8+
from typing import Any, Dict
9+
10+
11+
class Constants:
12+
I_COLUMN_NAME = 0
13+
CURSOR = "cursor"
14+
COLUMN_NAMES = "column_names"
15+
CONNECTION = "connection"
16+
BATCH_SIZE = 1000
17+
18+
19+
##### MYSQL
20+
21+
mysql_dict = {}
22+
23+
24+
def init_migrate_mysql(
25+
table_or_sql: str,
26+
config: mgp.Map,
27+
config_path: str = "",
28+
params: mgp.Nullable[mgp.Any] = None,
29+
):
30+
global mysql_dict
31+
32+
if params:
33+
_check_params_type(params)
34+
if len(config_path) > 0:
35+
config = _combine_config(config=config, config_path=config_path)
36+
37+
if _query_is_table(table_or_sql):
38+
table_or_sql = f"SELECT * FROM {table_or_sql};"
39+
40+
if threading.get_native_id not in mysql_dict:
41+
mysql_dict[threading.get_native_id] = {}
42+
43+
if Constants.CURSOR not in mysql_dict[threading.get_native_id]:
44+
mysql_dict[threading.get_native_id][Constants.CURSOR] = None
45+
46+
if mysql_dict[threading.get_native_id][Constants.CURSOR] is None:
47+
connection = mysql_connector.connect(**config)
48+
cursor = connection.cursor()
49+
cursor.execute(table_or_sql, params=params)
50+
51+
mysql_dict[threading.get_native_id][Constants.CONNECTION] = connection
52+
mysql_dict[threading.get_native_id][Constants.CURSOR] = cursor
53+
mysql_dict[threading.get_native_id][Constants.COLUMN_NAMES] = [
54+
column[Constants.I_COLUMN_NAME] for column in cursor.description
55+
]
56+
57+
58+
def mysql(
59+
table_or_sql: str,
60+
config: mgp.Map,
61+
config_path: str = "",
62+
params: mgp.Nullable[mgp.Any] = None,
63+
) -> mgp.Record(row=mgp.Map):
64+
"""
65+
With migrate.mysql you can access MySQL and execute queries. The result table is converted into a stream,
66+
and returned rows can be used to create or create graph structures. Config must be at least empty map.
67+
If config_path is passed, every key,value pair from JSON file will overwrite any values in config file.
68+
69+
:param table_or_sql: Table name or an SQL query
70+
:param config: Connection configuration parameters (as in mysql.connector.connect),
71+
:param config_path: Path to the JSON file containing configuration parameters (as in mysql.connector.connect)
72+
:param params: Optionally, queries may be parameterized. In that case, `params` provides parameter values
73+
:return: The result table as a stream of rows
74+
"""
75+
global mysql_dict
76+
cursor = mysql_dict[threading.get_native_id][Constants.CURSOR]
77+
column_names = mysql_dict[threading.get_native_id][Constants.COLUMN_NAMES]
78+
79+
rows = cursor.fetchmany(Constants.BATCH_SIZE)
80+
81+
return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows]
82+
83+
84+
def cleanup_migrate_mysql():
85+
global mysql_dict
86+
mysql_dict[threading.get_native_id][Constants.CURSOR] = None
87+
mysql_dict[threading.get_native_id][Constants.CONNECTION].close()
88+
mysql_dict[threading.get_native_id][Constants.CONNECTION].commit()
89+
mysql_dict[threading.get_native_id][Constants.CONNECTION] = None
90+
mysql_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None
91+
92+
93+
mgp.add_batch_read_proc(mysql, init_migrate_mysql, cleanup_migrate_mysql)
94+
95+
### SQL SERVER
96+
97+
sql_server_dict = {}
98+
99+
100+
def init_migrate_sql_server(
101+
table_or_sql: str,
102+
config: mgp.Map,
103+
config_path: str = "",
104+
params: mgp.Nullable[mgp.Any] = None,
105+
):
106+
global sql_server_dict
107+
108+
if params:
109+
_check_params_type(params, (list, tuple))
110+
else:
111+
params = []
112+
113+
if len(config_path) > 0:
114+
config = _combine_config(config=config, config_path=config_path)
115+
116+
if _query_is_table(table_or_sql):
117+
table_or_sql = f"SELECT * FROM {table_or_sql};"
118+
119+
if threading.get_native_id not in sql_server_dict:
120+
sql_server_dict[threading.get_native_id] = {}
121+
122+
if Constants.CURSOR not in sql_server_dict[threading.get_native_id]:
123+
sql_server_dict[threading.get_native_id][Constants.CURSOR] = None
124+
125+
if sql_server_dict[threading.get_native_id][Constants.CURSOR] is None:
126+
connection = pyodbc.connect(**config)
127+
cursor = connection.cursor()
128+
cursor.execute(table_or_sql, *params)
129+
130+
sql_server_dict[threading.get_native_id][Constants.CONNECTION] = connection
131+
sql_server_dict[threading.get_native_id][Constants.CURSOR] = cursor
132+
sql_server_dict[threading.get_native_id][Constants.COLUMN_NAMES] = [
133+
column[Constants.I_COLUMN_NAME] for column in cursor.description
134+
]
135+
136+
137+
def sql_server(
138+
table_or_sql: str,
139+
config: mgp.Map,
140+
config_path: str = "",
141+
params: mgp.Nullable[mgp.Any] = None,
142+
) -> mgp.Record(row=mgp.Map):
143+
"""
144+
With migrate.sql_server you can access SQL Server and execute queries. The result table is converted into a stream,
145+
and returned rows can be used to create or create graph structures. Config must be at least empty map.
146+
If config_path is passed, every key,value pair from JSON file will overwrite any values in config file.
147+
148+
:param table_or_sql: Table name or an SQL query
149+
:param config: Connection configuration parameters (as in pyodbc.connect),
150+
:param config_path: Path to the JSON file containing configuration parameters (as in pyodbc.connect)
151+
:param params: Optionally, queries may be parameterized. In that case, `params` provides parameter values
152+
:return: The result table as a stream of rows
153+
"""
154+
global sql_server_dict
155+
156+
cursor = sql_server_dict[threading.get_native_id][Constants.CURSOR]
157+
column_names = sql_server_dict[threading.get_native_id][Constants.COLUMN_NAMES]
158+
rows = cursor.fetchmany(Constants.BATCH_SIZE)
159+
160+
return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows]
161+
162+
163+
def cleanup_migrate_sql_server():
164+
global sql_server_dict
165+
sql_server_dict[threading.get_native_id][Constants.CURSOR] = None
166+
sql_server_dict[threading.get_native_id][Constants.CONNECTION].close()
167+
sql_server_dict[threading.get_native_id][Constants.CONNECTION].commit()
168+
sql_server_dict[threading.get_native_id][Constants.CONNECTION] = None
169+
sql_server_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None
170+
171+
172+
mgp.add_batch_read_proc(sql_server, init_migrate_sql_server, cleanup_migrate_sql_server)
173+
174+
### Oracle DB
175+
176+
oracle_db_dict = {}
177+
178+
179+
def init_migrate_oracle_db(
180+
table_or_sql: str,
181+
config: mgp.Map,
182+
config_path: str = "",
183+
params: mgp.Nullable[mgp.Any] = None,
184+
):
185+
global oracle_db_dict
186+
187+
if params:
188+
_check_params_type(params)
189+
190+
if len(config_path) > 0:
191+
config = _combine_config(config=config, config_path=config_path)
192+
193+
if _query_is_table(table_or_sql):
194+
table_or_sql = f"SELECT * FROM {table_or_sql}"
195+
196+
if not config:
197+
config = {}
198+
199+
# To prevent query execution from hanging
200+
if "disable_oob" not in config:
201+
config["disable_oob"] = True
202+
else:
203+
config["disable_oob"] = True # overwrite
204+
205+
if threading.get_native_id not in oracle_db_dict:
206+
oracle_db_dict[threading.get_native_id] = {}
207+
208+
if Constants.CURSOR not in oracle_db_dict[threading.get_native_id]:
209+
oracle_db_dict[threading.get_native_id][Constants.CURSOR] = None
210+
211+
if oracle_db_dict[threading.get_native_id][Constants.CURSOR] is None:
212+
connection = oracledb.connect(**config)
213+
cursor = connection.cursor()
214+
215+
if not params:
216+
cursor.execute(table_or_sql)
217+
elif isinstance(params, (list, tuple)):
218+
cursor.execute(table_or_sql, params)
219+
else:
220+
cursor.execute(table_or_sql, **params)
221+
222+
oracle_db_dict[threading.get_native_id][Constants.CONNECTION] = connection
223+
oracle_db_dict[threading.get_native_id][Constants.CURSOR] = cursor
224+
oracle_db_dict[threading.get_native_id][Constants.COLUMN_NAMES] = [
225+
column[Constants.I_COLUMN_NAME] for column in cursor.description
226+
]
227+
228+
229+
def oracle_db(
230+
table_or_sql: str,
231+
config: mgp.Map,
232+
config_path: str = "",
233+
params: mgp.Nullable[mgp.Any] = None,
234+
) -> mgp.Record(row=mgp.Map):
235+
"""
236+
With migrate.oracle_db you can access Oracle DB and execute queries. The result table is converted into a stream,
237+
and returned rows can be used to create or create graph structures. Config must be at least empty map.
238+
If config_path is passed, every key,value pair from JSON file will overwrite any values in config file.
239+
240+
:param table_or_sql: Table name or an SQL query
241+
:param config: Connection configuration parameters (as in oracledb.connect),
242+
:param config_path: Path to the JSON file containing configuration parameters (as in oracledb.connect)
243+
:param params: Optionally, queries may be parameterized. In that case, `params` provides parameter values
244+
:return: The result table as a stream of rows
245+
"""
246+
247+
global oracle_db_dict
248+
cursor = oracle_db_dict[threading.get_native_id][Constants.CURSOR]
249+
column_names = oracle_db_dict[threading.get_native_id][Constants.COLUMN_NAMES]
250+
rows = cursor.fetchmany(Constants.BATCH_SIZE)
251+
252+
return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows]
253+
254+
255+
def cleanup_migrate_oracle_db():
256+
global oracle_db_dict
257+
oracle_db_dict[threading.get_native_id][Constants.CURSOR] = None
258+
oracle_db_dict[threading.get_native_id][Constants.CONNECTION].close()
259+
oracle_db_dict[threading.get_native_id][Constants.CONNECTION].commit()
260+
oracle_db_dict[threading.get_native_id][Constants.CONNECTION] = None
261+
oracle_db_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None
262+
263+
264+
mgp.add_batch_read_proc(oracle_db, init_migrate_oracle_db, cleanup_migrate_oracle_db)
265+
266+
267+
def _query_is_table(table_or_sql: str) -> bool:
268+
return len(table_or_sql.split()) == 1
269+
270+
271+
def _load_config(path: str) -> Dict[str, Any]:
272+
try:
273+
with open(path, mode="r") as config:
274+
return json.load(config)
275+
except Exception:
276+
raise OSError("Could not open/read file.")
277+
278+
279+
def _combine_config(config: mgp.Map, config_path: str) -> Dict[str, Any]:
280+
assert len(config_path), "Path must not be empty"
281+
config_items = _load_config(path=config_path)
282+
283+
for key, value in config_items.items():
284+
config[key] = value
285+
return config
286+
287+
288+
def _name_row_cells(row_cells, column_names) -> Dict[str, Any]:
289+
return dict(map(lambda column, value: (column, value), column_names, row_cells))
290+
291+
292+
def _check_params_type(params: Any, types=(dict, list, tuple)) -> None:
293+
if not isinstance(params, types):
294+
raise TypeError(
295+
"Database query parameter values must be passed in a container of type List[Any] (or Map, if migrating from MySQL or Oracle DB)"
296+
)

python/requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ torchmetrics==0.9.3
99
igraph==0.10.2
1010
scikit-learn==0.24.2
1111
gqlalchemy==1.4.1
12+
mysql-connector-python==8.0.32
13+
oracledb==1.2.2
14+
pyodbc==4.0.35

0 commit comments

Comments
 (0)