Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 120 additions & 64 deletions agent/database_physical_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import shutil
import subprocess
from contextlib import suppress

from agent.base import AgentException
from agent.database import CustomPeeweeDB
Expand Down Expand Up @@ -76,6 +77,7 @@ def create_restore_job(self):
self.hold_write_lock_on_myisam_tables()
self.perform_myisam_file_operations()
self.unlock_all_tables()
self._close_db_connections()
self.perform_post_restoration_validation_and_fixes()

@step("Validate Backup Files")
Expand Down Expand Up @@ -281,25 +283,29 @@ def hold_write_lock_on_myisam_tables(self):
def perform_myisam_file_operations(self):
self._perform_file_operations(engine="myisam")

@step("Unlock All Tables")
def unlock_all_tables(self):
self._get_target_db().execute_sql("UNLOCK TABLES;")
self._get_target_db_for_myisam().execute_sql("UNLOCK TABLES;")

@step("Validate And Fix Tables")
def perform_post_restoration_validation_and_fixes(self):
innodb_tables_with_fts = self.get_innodb_tables_with_fts_index()
innodb_tables_with_fts = self._get_innodb_tables_with_fts_index()
"""
FLUSH TABLES ... FOR EXPORT does not support FULLTEXT indexes.
https://dev.mysql.com/doc/refman/8.4/en/innodb-table-import.html#:~:text=in%20the%20operation.-,Limitations,-The%20Transportable%20Tablespaces

We can either drop + add index.
Or, run `OPTIMIZE TABLE` on the table to rebuild the index.
https://mariadb.com/kb/en/optimize-table/#updating-an-innodb-fulltext-index
Need to drop all fulltext indexes of InnoDB tables.
Then, optimize table to fix existing corruptions and rebuild table (if needed).
Then, recreate the fulltext indexes.
"""

for table in innodb_tables_with_fts:
"""
No need to waste time on checking whether index is corrupted or not
Because, physical restoration will not work for FULLTEXT index.
"""
if not self.repair_table(table, "innodb"):
raise Exception(f"Failed to repair table {table}")
self.recreate_fts_indexes(table)

"""
MyISAM table corruption can generally happen due to mismatch of no of records in MYD file.
Expand All @@ -312,14 +318,9 @@ def perform_post_restoration_validation_and_fixes(self):
https://dev.mysql.com/doc/refman/8.4/en/myisam-repair.html
"""
for table in self.myisam_tables:
if self.is_table_corrupted(table) and not self.repair_table(table, "myisam"):
if self.is_table_corrupted(table) and not self.repair_myisam_table(table):
raise Exception(f"Failed to repair table {table}")

@step("Unlock All Tables")
def unlock_all_tables(self):
self._get_target_db().execute_sql("UNLOCK TABLES;")
self._get_target_db_for_myisam().execute_sql("UNLOCK TABLES;")

def _warmup_files(self, file_paths: list[str]):
"""
Once the snapshot is converted to disk and attached to the instance,
Expand Down Expand Up @@ -354,43 +355,6 @@ def _perform_file_operations(self, engine: str):
os.path.join(self.target_db_directory, file),
)

def _get_target_db(self) -> CustomPeeweeDB:
if self._target_db_instance is not None:
if not self._target_db_instance.is_connection_usable():
raise DatabaseConnectionClosedWithDatabase()
return self._target_db_instance

self._target_db_instance = CustomPeeweeDB(
self.target_db,
user=self.target_db_user,
password=self.target_db_password,
host=self.target_db_host,
port=self.target_db_port,
)
self._target_db_instance.connect()
# Set session wait timeout to 4 hours [EXPERIMENTAL]
self._target_db_instance.execute_sql("SET SESSION wait_timeout = 14400;")
return self._target_db_instance

def _get_target_db_for_myisam(self) -> CustomPeeweeDB:
if self._target_db_instance_for_myisam is not None:
if not self._target_db_instance_for_myisam.is_connection_usable():
raise DatabaseConnectionClosedWithDatabase()
return self._target_db_instance_for_myisam

self._target_db_instance_for_myisam = CustomPeeweeDB(
self.target_db,
user=self.target_db_user,
password=self.target_db_password,
host=self.target_db_host,
port=self.target_db_port,
autocommit=False,
)
self._target_db_instance_for_myisam.connect()
# Set session wait timeout to 4 hours [EXPERIMENTAL]
self._target_db_instance_for_myisam.execute_sql("SET SESSION wait_timeout = 14400;")
return self._target_db_instance_for_myisam

def is_table_need_to_be_restored(self, table_name: str) -> bool:
if not self.restore_specific_tables:
return True
Expand Down Expand Up @@ -427,7 +391,10 @@ def get_drop_table_statement(self, table_name) -> str:
return f"DROP TABLE IF EXISTS `{table_name}`;"

def is_table_corrupted(self, table_name: str) -> bool:
result = run_sql_query(self._get_target_db(), f"CHECK TABLE `{table_name}` QUICK;")
result = run_sql_query(
self._get_target_db(raise_error_on_connection_closed=False),
f"CHECK TABLE `{table_name}` QUICK;",
)
"""
+-----------------------------------+-------+----------+------------------------------------------------------+
| Table | Op | Msg_type | Msg_text |
Expand All @@ -452,18 +419,16 @@ def is_table_corrupted(self, table_name: str) -> bool:
break
return isError

def repair_table(self, table_name: str, engine: str) -> bool:
if engine == "innodb":
result = run_sql_query(self._get_target_db(), f"OPTIMIZE TABLE `{table_name}`;")
elif engine == "myisam":
result = run_sql_query(self._get_target_db(), f"REPAIR TABLE `{table_name}` USE_FRM;")
else:
raise Exception(f"Engine {engine} is not supported")
def repair_myisam_table(self, table_name: str) -> bool:
result = run_sql_query(
self._get_target_db(raise_error_on_connection_closed=False),
f"REPAIR TABLE `{table_name}` USE_FRM;",
)
"""
+---------------------------------------------------+--------+----------+----------+
| Table | Op | Msg_type | Msg_text |
+---------------------------------------------------+--------+----------+----------+
| _8edd549f4b072174.tabInsights Query Execution Log | repair | status | OK |
| _8edd549f4b072174.tabInsights Query Execution Log | repair | status | OK |
+---------------------------------------------------+--------+----------+----------+

Msg Type can be status, error, info, note, or warning
Expand All @@ -476,9 +441,27 @@ def repair_table(self, table_name: str, engine: str) -> bool:

return not isErrorOccurred

def get_innodb_tables_with_fts_index(self):
def recreate_fts_indexes(self, table: str):
fts_indexes = self._get_fts_indexes_of_table(table)
for index_name, _ in fts_indexes.items():
run_sql_query(
self._get_target_db(raise_error_on_connection_closed=False),
f"ALTER TABLE `{table}` DROP INDEX IF EXISTS `{index_name}`;",
)
# Optimize table to fix existing corruptions
run_sql_query(
self._get_target_db(raise_error_on_connection_closed=False), f"OPTIMIZE TABLE `{table}`;"
)
# Recreate the indexes
for index_name, columns in fts_indexes.items():
run_sql_query(
self._get_target_db(raise_error_on_connection_closed=False),
f"ALTER TABLE `{table}` ADD FULLTEXT INDEX `{index_name}` ({columns});",
)

def _get_innodb_tables_with_fts_index(self):
rows = run_sql_query(
self._get_target_db(),
self._get_target_db(raise_error_on_connection_closed=False),
f"""
SELECT
DISTINCT(t.TABLE_NAME)
Expand All @@ -491,16 +474,89 @@ def get_innodb_tables_with_fts_index(self):
WHERE
s.INDEX_TYPE = 'FULLTEXT'
AND t.TABLE_SCHEMA = '{self.target_db}'
AND t.ENGINE = 'InnoDB'
AND t.ENGINE = 'InnoDB';
""",
)
return [row[0] for row in rows]

def __del__(self):
def _get_fts_indexes_of_table(self, table: str) -> dict[str, str]:
rows = run_sql_query(
self._get_target_db(raise_error_on_connection_closed=False),
f"""
SELECT
INDEX_NAME, group_concat(column_name ORDER BY seq_in_index) AS columns
FROM
information_schema.statistics
WHERE
TABLE_SCHEMA = '{self.target_db}'
AND TABLE_NAME = '{table}'
AND INDEX_TYPE = 'FULLTEXT'
GROUP BY
INDEX_NAME;
""",
)
return {row[0]: row[1] for row in rows}

def _get_target_db(self, raise_error_on_connection_closed: bool = True) -> CustomPeeweeDB:
if self._target_db_instance is not None and not is_db_connection_usable(self._target_db_instance):
if raise_error_on_connection_closed:
raise DatabaseConnectionClosedWithDatabase()
self._target_db_instance = None

if self._target_db_instance is not None:
return self._target_db_instance

self._target_db_instance = CustomPeeweeDB(
self.target_db,
user=self.target_db_user,
password=self.target_db_password,
host=self.target_db_host,
port=self.target_db_port,
)
self._target_db_instance.connect()
# Set session wait timeout to 4 hours [EXPERIMENTAL]
self._target_db_instance.execute_sql("SET SESSION wait_timeout = 14400;")
return self._target_db_instance

def _get_target_db_for_myisam(self) -> CustomPeeweeDB:
if self._target_db_instance_for_myisam is not None:
if not is_db_connection_usable(self._target_db_instance_for_myisam):
raise DatabaseConnectionClosedWithDatabase()
return self._target_db_instance_for_myisam

self._target_db_instance_for_myisam = CustomPeeweeDB(
self.target_db,
user=self.target_db_user,
password=self.target_db_password,
host=self.target_db_host,
port=self.target_db_port,
autocommit=False,
)
self._target_db_instance_for_myisam.connect()
# Set session wait timeout to 4 hours [EXPERIMENTAL]
self._target_db_instance_for_myisam.execute_sql("SET SESSION wait_timeout = 14400;")
return self._target_db_instance_for_myisam

def _close_db_connections(self):
if self._target_db_instance is not None:
self._target_db_instance.close()
with suppress(Exception):
self._target_db_instance.close()
if self._target_db_instance_for_myisam is not None:
self._target_db_instance_for_myisam.close()
with suppress(Exception):
self._target_db_instance_for_myisam.close()

def __del__(self):
self._close_db_connections()


def is_db_connection_usable(db: CustomPeeweeDB) -> bool:
try:
if not db.is_connection_usable():
return False
db.execute_sql("SELECT 1;")
return True
except Exception:
return False


def run_sql_query(db: CustomPeeweeDB, query: str) -> list[str]:
Expand Down