Skip to content
125 changes: 125 additions & 0 deletions src/mysql_to_sqlite3/mysql_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Miscellaneous MySQL utilities."""

import typing as t
from collections import defaultdict, deque

from mysql.connector import CharacterSet
from mysql.connector.abstracts import MySQLConnectionAbstract, MySQLCursorAbstract
from mysql.connector.charsets import MYSQL_CHARACTER_SETS


Expand Down Expand Up @@ -39,3 +41,126 @@
yield CharSet(index, charset, info[1])
except KeyError:
continue


def fetch_schema_metadata(cursor: MySQLCursorAbstract) -> t.Tuple[t.Set[str], t.List[t.Tuple[str, str]]]:
"""Fetch schema metadata from the database.

Returns:
tables: all base tables in `schema`
edges: list of (child, parent) pairs for every FK
"""
# 1. all ordinary tables
cursor.execute(
"""
SELECT TABLE_NAME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = SCHEMA()
AND TABLE_TYPE = 'BASE TABLE';
"""
)
# Use a more explicit approach to handle the row data
tables: t.Set[str] = set()
for row in cursor.fetchall():
# Extract table name from row
table_name: str
try:
# Try to get the first element
first_element = row[0] if isinstance(row, (list, tuple)) else row
table_name = str(first_element) if first_element is not None else ""
except (IndexError, TypeError):

Check warning on line 71 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L71

Added line #L71 was not covered by tests
# If that fails, try other approaches
if hasattr(row, "TABLE_NAME"):
table_name = str(row.TABLE_NAME) if row.TABLE_NAME is not None else ""

Check warning on line 74 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L73-L74

Added lines #L73 - L74 were not covered by tests
else:
table_name = str(row) if row is not None else ""

Check warning on line 76 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L76

Added line #L76 was not covered by tests
tables.add(table_name)

# 2. FK edges (child -> parent)
cursor.execute(
"""
SELECT TABLE_NAME AS child, REFERENCED_TABLE_NAME AS parent
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = SCHEMA()
AND REFERENCED_TABLE_NAME IS NOT NULL;
"""
)
# Use a more explicit approach to handle the row data
edges: t.List[t.Tuple[str, str]] = []
for row in cursor.fetchall():
# Extract child and parent from row
child: str
parent: str
try:
# Try to get the elements as sequence
if isinstance(row, (list, tuple)) and len(row) >= 2:
child = str(row[0]) if row[0] is not None else ""
parent = str(row[1]) if row[1] is not None else ""
# Try to access as dictionary or object
elif hasattr(row, "child") and hasattr(row, "parent"):
child = str(row.child) if row.child is not None else ""
parent = str(row.parent) if row.parent is not None else ""
# Try to access as dictionary with string keys
elif isinstance(row, dict) and "child" in row and "parent" in row:
child = str(row["child"]) if row["child"] is not None else ""
parent = str(row["parent"]) if row["parent"] is not None else ""
else:
# Skip if we can't extract the data
continue
except (IndexError, TypeError, KeyError):

Check warning on line 110 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L110

Added line #L110 was not covered by tests
# Skip if any error occurs
continue

Check warning on line 112 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L112

Added line #L112 was not covered by tests

edges.append((child, parent))

return tables, edges


def topo_sort_tables(
tables: t.Set[str], edges: t.List[t.Tuple[str, str]]
) -> t.Tuple[t.List[str], t.List[t.Tuple[str, str]]]:
"""Perform a topological sort on tables based on foreign key dependencies.

Returns:
ordered: tables in FK-safe creation order
cyclic_edges: any edges that keep the graph cyclic (empty if a pure DAG)
"""
# dependency graph: child → {parents}
deps: t.Dict[str, t.Set[str]] = {tbl: set() for tbl in tables}
# reverse edges: parent → {children}
rev: t.Dict[str, t.Set[str]] = defaultdict(set)

for child, parent in edges:
deps[child].add(parent)
rev[parent].add(child)

queue: deque[str] = deque(tbl for tbl, parents in deps.items() if not parents)
ordered: t.List[str] = []

while queue:
table = queue.popleft()
ordered.append(table)
# "remove" table from graph
for child in rev[table]:
deps[child].discard(table)
if not deps[child]:
queue.append(child)

# any table still having parents is in a cycle
cyclic_edges: t.List[t.Tuple[str, str]] = [
(child, parent) for child, parents in deps.items() if parents for parent in parents
]
return ordered, cyclic_edges


def compute_creation_order(mysql_conn: MySQLConnectionAbstract) -> t.Tuple[t.List[str], t.List[t.Tuple[str, str]]]:
"""Compute the table creation order respecting foreign key constraints.

Returns:
A tuple (ordered_tables, cyclic_edges) where cyclic_edges is empty when the schema is acyclic.
"""
with mysql_conn.cursor() as cur:
tables: t.Set[str]
edges: t.List[t.Tuple[str, str]]
tables, edges = fetch_schema_metadata(cur)
return topo_sort_tables(tables, edges)
47 changes: 42 additions & 5 deletions src/mysql_to_sqlite3/transporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from mysql.connector.types import RowItemType
from tqdm import tqdm, trange

from mysql_to_sqlite3.mysql_utils import CHARSET_INTRODUCERS
from mysql_to_sqlite3.mysql_utils import CHARSET_INTRODUCERS, compute_creation_order
from mysql_to_sqlite3.sqlite_utils import (
CollatingSequences,
Integer_Types,
Expand Down Expand Up @@ -684,14 +684,45 @@
)
tables = (row[0].decode() for row in self._mysql_cur.fetchall()) # type: ignore[union-attr]

# Convert tables iterable to a list for reuse
table_list: t.List[str] = []
for table_name in tables:
if isinstance(table_name, bytes):
table_name = table_name.decode()

Check warning on line 691 in src/mysql_to_sqlite3/transporter.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/transporter.py#L691

Added line #L691 was not covered by tests
# Ensure table_name is a string
table_str = str(table_name) if table_name is not None else ""
table_list.append(table_str)

# Try to compute the table creation order to respect foreign key constraints
try:
if hasattr(self, "_mysql"):
# Compute the table creation order to respect foreign key constraints
ordered_tables: t.List[str]
cyclic_edges: t.List[t.Tuple[str, str]]
ordered_tables, cyclic_edges = compute_creation_order(self._mysql)

# Filter ordered_tables to only include tables we want to transfer
ordered_tables = [table for table in ordered_tables if table in table_list]

# Log information about cyclic dependencies
if cyclic_edges:
self._logger.warning(
"Circular foreign key dependencies detected: %s",
", ".join(f"{child} -> {parent}" for child, parent in cyclic_edges),
)
else:
# If _mysql attribute is not available (e.g., in tests), use the original table list
ordered_tables = table_list
except Exception as e: # pylint: disable=W0718
# If anything goes wrong, fall back to the original table list
self._logger.warning("Failed to compute table creation order: %s", str(e))
ordered_tables = table_list

try:
# turn off foreign key checking in SQLite while transferring data
self._sqlite_cur.execute("PRAGMA foreign_keys=OFF")

for table_name in tables:
if isinstance(table_name, bytes):
table_name = table_name.decode()

for table_name in ordered_tables:
self._logger.info(
"%s%sTransferring table %s",
"[WITHOUT DATA] " if self._without_data else "",
Expand Down Expand Up @@ -755,6 +786,12 @@
# re-enable foreign key checking once done transferring
self._sqlite_cur.execute("PRAGMA foreign_keys=ON")

# Check for any foreign key constraint violations
self._sqlite_cur.execute("PRAGMA foreign_key_check")
fk_violations: t.List[sqlite3.Row] = self._sqlite_cur.fetchall()
if fk_violations:
self._logger.warning("Foreign key constraint violations detected: %s", fk_violations)

if self._vacuum:
self._logger.info("Vacuuming created SQLite database file.\nThis might take a while.")
self._sqlite_cur.execute("VACUUM")
Expand Down
Loading