-
Notifications
You must be signed in to change notification settings - Fork 29
Detect missing database and recreate & Fix enum conversion uppercase to lowercase etc. #121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
a6e476c
40fba71
aad9364
883f426
727c19f
017e48e
53e4018
2fbec2e
fcf167a
202c8c0
f458ffd
e9e9a2d
d83f2de
a7d4ea7
8ac2900
b3223cd
037bdb0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -182,10 +182,9 @@ def run(self): | |
| # ensure target database still exists | ||
| if self.target_database not in self.clickhouse_api.get_databases(): | ||
| logger.warning(f'database {self.target_database} missing in CH') | ||
| if self.initial_only: | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could not remove this check. We should probably check for both target_database and target_database_tmp missing, and only in this case we are good to go with restarting.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay i modified it, what do you think now? My only problem with it is that on boot if the database was not there, it would just continue as if it was 🤷🏿 I'm also available to do a call as well if needed. I'm on discord at: @jaredmdobson
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good, thank you! |
||
| logger.warning('will run replication from scratch') | ||
| self.state.remove() | ||
| self.state = self.create_state() | ||
| logger.warning('will run replication from scratch') | ||
| self.state.remove() | ||
| self.state = self.create_state() | ||
|
|
||
| if self.state.status == Status.RUNNING_REALTIME_REPLICATION: | ||
| self.run_realtime_replication() | ||
|
|
@@ -227,6 +226,10 @@ def create_initial_structure_table(self, table_name): | |
| ) | ||
| self.validate_mysql_structure(mysql_structure) | ||
| clickhouse_structure = self.converter.convert_table_structure(mysql_structure) | ||
|
|
||
| # Always set if_not_exists to True to prevent errors when tables already exist | ||
| clickhouse_structure.if_not_exists = True | ||
|
|
||
| self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure) | ||
| indexes = self.config.get_indexes(self.database, table_name) | ||
| self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| from .parser import parse_mysql_enum, is_enum_type | ||
| from .converter import EnumConverter | ||
| from .utils import find_enum_definition_end, extract_field_components | ||
| from .ddl_parser import ( | ||
| find_enum_or_set_definition_end, | ||
| parse_enum_or_set_field, | ||
| extract_enum_or_set_values, | ||
| strip_value | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| 'parse_mysql_enum', | ||
| 'is_enum_type', | ||
| 'EnumConverter', | ||
| 'find_enum_definition_end', | ||
| 'extract_field_components', | ||
| 'find_enum_or_set_definition_end', | ||
| 'parse_enum_or_set_field', | ||
| 'extract_enum_or_set_values', | ||
| 'strip_value' | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| from typing import List, Union, Optional, Any | ||
| from logging import getLogger | ||
|
|
||
| # Create a single module-level logger | ||
| logger = getLogger(__name__) | ||
|
|
||
| class EnumConverter: | ||
| """Class to handle conversion of enum values between MySQL and ClickHouse""" | ||
|
|
||
| @staticmethod | ||
| def convert_mysql_to_clickhouse_enum( | ||
| value: Any, | ||
| enum_values: List[str], | ||
| field_name: str = "unknown" | ||
| ) -> Optional[Union[str, int]]: | ||
| """ | ||
| Convert a MySQL enum value to the appropriate ClickHouse representation | ||
| Args: | ||
| value: The MySQL enum value (can be int, str, None) | ||
| enum_values: List of possible enum string values | ||
| field_name: Name of the field (for better error reporting) | ||
| Returns: | ||
| The properly converted enum value for ClickHouse | ||
| """ | ||
| # Handle NULL values | ||
| if value is None: | ||
| return None | ||
|
|
||
| # Handle integer values (index-based) | ||
| if isinstance(value, int): | ||
| # Check if the value is 0 | ||
| if value == 0: | ||
| # Return 0 as-is - let ClickHouse handle it according to the field's nullability | ||
| logger.debug(f"ENUM CONVERSION: Found enum index 0 for field '{field_name}'. Keeping as 0.") | ||
| return 0 | ||
|
|
||
| # Validate that the enum index is within range | ||
| if value < 1 or value > len(enum_values): | ||
| # Log the issue | ||
| logger.error(f"ENUM CONVERSION: Invalid enum index {value} for field '{field_name}' " | ||
| f"with values {enum_values}") | ||
| # Return the value unchanged | ||
| return value | ||
| else: | ||
| # Convert to the string representation (lowercase to match our new convention) | ||
| return enum_values[int(value)-1].lower() | ||
|
|
||
| # Handle string values | ||
| elif isinstance(value, str): | ||
| # Validate that the string value exists in enum values | ||
| # First check case-sensitive, then case-insensitive | ||
| if value in enum_values: | ||
| return value.lower() | ||
|
|
||
| # Try case-insensitive match | ||
| lowercase_enum_values = [v.lower() for v in enum_values] | ||
| if value.lower() in lowercase_enum_values: | ||
| return value.lower() | ||
|
|
||
| # Value not found in enum values | ||
| logger.error(f"ENUM CONVERSION: Invalid enum value '{value}' not in {enum_values} " | ||
| f"for field '{field_name}'") | ||
| # Return the value unchanged | ||
| return value | ||
|
|
||
| # Handle any other unexpected types | ||
| else: | ||
| logger.error(f"ENUM CONVERSION: Unexpected type {type(value)} for enum field '{field_name}'") | ||
| # Return the value unchanged | ||
| return value |
Uh oh!
There was an error while loading. Please reload this page.