diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index b9b7273..e83d732 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -222,19 +222,31 @@ def convert_table_structure(self, mysql_structure: TableStructure) -> TableStruc clickhouse_structure.preprocess() return clickhouse_structure - def convert_records(self, mysql_records, mysql_structure: TableStructure, clickhouse_structure: TableStructure): + def convert_records( + self, mysql_records, mysql_structure: TableStructure, clickhouse_structure: TableStructure, + only_primary: bool = False, + ): mysql_field_types = [field.field_type for field in mysql_structure.fields] clickhouse_filed_types = [field.field_type for field in clickhouse_structure.fields] clickhouse_records = [] for mysql_record in mysql_records: - clickhouse_record = self.convert_record(mysql_record, mysql_field_types, clickhouse_filed_types) + clickhouse_record = self.convert_record( + mysql_record, mysql_field_types, clickhouse_filed_types, mysql_structure, only_primary, + ) clickhouse_records.append(clickhouse_record) return clickhouse_records - def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types): + def convert_record( + self, mysql_record, mysql_field_types, clickhouse_field_types, mysql_structure: TableStructure, + only_primary: bool, + ): clickhouse_record = [] for idx, mysql_field_value in enumerate(mysql_record): + if only_primary and idx not in mysql_structure.primary_key_ids: + clickhouse_record.append(mysql_field_value) + continue + clickhouse_field_value = mysql_field_value mysql_field_type = mysql_field_types[idx] clickhouse_field_type = clickhouse_field_types[idx] @@ -256,6 +268,13 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types if 'UInt64' in clickhouse_field_type and clickhouse_field_value < 0: clickhouse_field_value = 18446744073709551616 + clickhouse_field_value + if 'String' in clickhouse_field_type and ( + 'text' in mysql_field_type or 'char' in mysql_field_type + ): + if isinstance(clickhouse_field_value, bytes): + charset = mysql_structure.charset or 'utf-8' + clickhouse_field_value = clickhouse_field_value.decode(charset) + if 'point' in mysql_field_type: clickhouse_field_value = parse_mysql_point(clickhouse_field_value) @@ -513,6 +532,18 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None inner_tokens = ''.join([str(t) for t in inner_tokens[1:-1]]).strip() inner_tokens = split_high_level(inner_tokens, ',') + prev_token = '' + prev_prev_token = '' + for line in tokens[4:]: + curr_token = line.value + if prev_token == '=' and prev_prev_token.lower() == 'charset': + structure.charset = curr_token + prev_prev_token = prev_token + prev_token = curr_token + + if structure.charset.startswith('utf8'): + structure.charset = 'utf-8' + for line in inner_tokens: if line.lower().startswith('unique key'): continue diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index c3963da..81d065d 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -459,8 +459,12 @@ def handle_erase_event(self, event: LogEvent): self.stats.erase_records_count += len(event.records) table_structure_ch: TableStructure = self.state.tables_structure[event.table_name][1] + table_structure_mysql: TableStructure = self.state.tables_structure[event.table_name][0] - keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in event.records] + records = self.converter.convert_records( + event.records, table_structure_mysql, table_structure_ch, only_primary=True, + ) + keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in records] current_table_records_to_insert = self.records_to_insert[event.table_name] current_table_records_to_delete = self.records_to_delete[event.table_name] diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index 2af5dbf..255a3b7 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -47,8 +47,11 @@ def drop_database(self, db_name): def create_database(self, db_name): self.cursor.execute(f'CREATE DATABASE {db_name}') - def execute(self, command, commit=False): - self.cursor.execute(command) + def execute(self, command, commit=False, args=None): + if args: + self.cursor.execute(command, args) + else: + self.cursor.execute(command) if commit: self.db.commit() diff --git a/mysql_ch_replicator/pymysqlreplication/row_event.py b/mysql_ch_replicator/pymysqlreplication/row_event.py index 11429f7..a4dc452 100644 --- a/mysql_ch_replicator/pymysqlreplication/row_event.py +++ b/mysql_ch_replicator/pymysqlreplication/row_event.py @@ -332,7 +332,8 @@ def __read_string(self, size, column): else: # MYSQL 5.xx Version Goes Here # We don't know encoding type So apply Default Utf-8 - string = string.decode(errors=decode_errors) + #string = string.decode(errors=decode_errors) + pass # decode it later return string def __read_bit(self, column): diff --git a/mysql_ch_replicator/table_structure.py b/mysql_ch_replicator/table_structure.py index 027710e..d309cd9 100644 --- a/mysql_ch_replicator/table_structure.py +++ b/mysql_ch_replicator/table_structure.py @@ -12,6 +12,7 @@ class TableStructure: primary_keys: str = '' primary_key_ids: int = 0 table_name: str = '' + charset: str = '' def preprocess(self): field_names = [f.name for f in self.fields] diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 589709a..a99bdbf 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -318,9 +318,10 @@ def test_runner(): KEY `IDX_age` (`age`), FULLTEXT KEY `IDX_name` (`name`), PRIMARY KEY (id) -); +) ENGINE=InnoDB AUTO_INCREMENT=2478808 DEFAULT CHARSET=latin1; ''') + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True) mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True) @@ -367,6 +368,16 @@ def test_runner(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4) + mysql.execute( + command=f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES (%s, %s);", + args=(b'H\xe4llo'.decode('latin-1'), 1912), + commit=True, + ) + + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5) + assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo') + + mysql.create_database(TEST_DB_NAME_2) assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())