Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,31 @@ def convert_table_structure(self, mysql_structure: TableStructure) -> TableStruc
clickhouse_structure.preprocess()
return clickhouse_structure

def convert_records(self, mysql_records, mysql_structure: TableStructure, clickhouse_structure: TableStructure):
def convert_records(
self, mysql_records, mysql_structure: TableStructure, clickhouse_structure: TableStructure,
only_primary: bool = False,
):
mysql_field_types = [field.field_type for field in mysql_structure.fields]
clickhouse_filed_types = [field.field_type for field in clickhouse_structure.fields]

clickhouse_records = []
for mysql_record in mysql_records:
clickhouse_record = self.convert_record(mysql_record, mysql_field_types, clickhouse_filed_types)
clickhouse_record = self.convert_record(
mysql_record, mysql_field_types, clickhouse_filed_types, mysql_structure, only_primary,
)
clickhouse_records.append(clickhouse_record)
return clickhouse_records

def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types):
def convert_record(
self, mysql_record, mysql_field_types, clickhouse_field_types, mysql_structure: TableStructure,
only_primary: bool,
):
clickhouse_record = []
for idx, mysql_field_value in enumerate(mysql_record):
if only_primary and idx not in mysql_structure.primary_key_ids:
clickhouse_record.append(mysql_field_value)
continue

clickhouse_field_value = mysql_field_value
mysql_field_type = mysql_field_types[idx]
clickhouse_field_type = clickhouse_field_types[idx]
Expand All @@ -256,6 +268,13 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types
if 'UInt64' in clickhouse_field_type and clickhouse_field_value < 0:
clickhouse_field_value = 18446744073709551616 + clickhouse_field_value

if 'String' in clickhouse_field_type and (
'text' in mysql_field_type or 'char' in mysql_field_type
):
if isinstance(clickhouse_field_value, bytes):
charset = mysql_structure.charset or 'utf-8'
clickhouse_field_value = clickhouse_field_value.decode(charset)

if 'point' in mysql_field_type:
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)

Expand Down Expand Up @@ -513,6 +532,18 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
inner_tokens = ''.join([str(t) for t in inner_tokens[1:-1]]).strip()
inner_tokens = split_high_level(inner_tokens, ',')

prev_token = ''
prev_prev_token = ''
for line in tokens[4:]:
curr_token = line.value
if prev_token == '=' and prev_prev_token.lower() == 'charset':
structure.charset = curr_token
prev_prev_token = prev_token
prev_token = curr_token

if structure.charset.startswith('utf8'):
structure.charset = 'utf-8'

for line in inner_tokens:
if line.lower().startswith('unique key'):
continue
Expand Down
6 changes: 5 additions & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,12 @@ def handle_erase_event(self, event: LogEvent):
self.stats.erase_records_count += len(event.records)

table_structure_ch: TableStructure = self.state.tables_structure[event.table_name][1]
table_structure_mysql: TableStructure = self.state.tables_structure[event.table_name][0]

keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in event.records]
records = self.converter.convert_records(
event.records, table_structure_mysql, table_structure_ch, only_primary=True,
)
keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in records]

current_table_records_to_insert = self.records_to_insert[event.table_name]
current_table_records_to_delete = self.records_to_delete[event.table_name]
Expand Down
7 changes: 5 additions & 2 deletions mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ def drop_database(self, db_name):
def create_database(self, db_name):
self.cursor.execute(f'CREATE DATABASE {db_name}')

def execute(self, command, commit=False):
self.cursor.execute(command)
def execute(self, command, commit=False, args=None):
if args:
self.cursor.execute(command, args)
else:
self.cursor.execute(command)
if commit:
self.db.commit()

Expand Down
3 changes: 2 additions & 1 deletion mysql_ch_replicator/pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ def __read_string(self, size, column):
else:
# MYSQL 5.xx Version Goes Here
# We don't know encoding type So apply Default Utf-8
string = string.decode(errors=decode_errors)
#string = string.decode(errors=decode_errors)
pass # decode it later
return string

def __read_bit(self, column):
Expand Down
1 change: 1 addition & 0 deletions mysql_ch_replicator/table_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TableStructure:
primary_keys: str = ''
primary_key_ids: int = 0
table_name: str = ''
charset: str = ''

def preprocess(self):
field_names = [f.name for f in self.fields]
Expand Down
13 changes: 12 additions & 1 deletion test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,10 @@ def test_runner():
KEY `IDX_age` (`age`),
FULLTEXT KEY `IDX_name` (`name`),
PRIMARY KEY (id)
);
) ENGINE=InnoDB AUTO_INCREMENT=2478808 DEFAULT CHARSET=latin1;
''')


mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True)

Expand Down Expand Up @@ -367,6 +368,16 @@ def test_runner():

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)

mysql.execute(
command=f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES (%s, %s);",
args=(b'H\xe4llo'.decode('latin-1'), 1912),
commit=True,
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo')


mysql.create_database(TEST_DB_NAME_2)
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())

Expand Down