Skip to content

Commit f595623

Browse files
committed
Fixed tables with alternative encoding
1 parent 1e8af89 commit f595623

File tree

5 files changed

+43
-6
lines changed

5 files changed

+43
-6
lines changed

mysql_ch_replicator/converter.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,13 @@ def convert_records(self, mysql_records, mysql_structure: TableStructure, clickh
228228

229229
clickhouse_records = []
230230
for mysql_record in mysql_records:
231-
clickhouse_record = self.convert_record(mysql_record, mysql_field_types, clickhouse_filed_types)
231+
clickhouse_record = self.convert_record(
232+
mysql_record, mysql_field_types, clickhouse_filed_types, mysql_structure,
233+
)
232234
clickhouse_records.append(clickhouse_record)
233235
return clickhouse_records
234236

235-
def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types):
237+
def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types, mysql_structure: TableStructure):
236238
clickhouse_record = []
237239
for idx, mysql_field_value in enumerate(mysql_record):
238240
clickhouse_field_value = mysql_field_value
@@ -256,6 +258,13 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types
256258
if 'UInt64' in clickhouse_field_type and clickhouse_field_value < 0:
257259
clickhouse_field_value = 18446744073709551616 + clickhouse_field_value
258260

261+
if 'String' in clickhouse_field_type and (
262+
'text' in mysql_field_type or 'char' in mysql_field_type
263+
):
264+
if isinstance(clickhouse_field_value, bytes):
265+
charset = mysql_structure.charset or 'utf-8'
266+
clickhouse_field_value = clickhouse_field_value.decode(charset)
267+
259268
if 'point' in mysql_field_type:
260269
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)
261270

@@ -513,6 +522,18 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
513522
inner_tokens = ''.join([str(t) for t in inner_tokens[1:-1]]).strip()
514523
inner_tokens = split_high_level(inner_tokens, ',')
515524

525+
prev_token = ''
526+
prev_prev_token = ''
527+
for line in tokens[4:]:
528+
curr_token = line.value
529+
if prev_token == '=' and prev_prev_token.lower() == 'charset':
530+
structure.charset = curr_token
531+
prev_prev_token = prev_token
532+
prev_token = curr_token
533+
534+
if structure.charset.startswith('utf8'):
535+
structure.charset = 'utf-8'
536+
516537
for line in inner_tokens:
517538
if line.lower().startswith('unique key'):
518539
continue

mysql_ch_replicator/mysql_api.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,11 @@ def drop_database(self, db_name):
4747
def create_database(self, db_name):
4848
self.cursor.execute(f'CREATE DATABASE {db_name}')
4949

50-
def execute(self, command, commit=False):
51-
self.cursor.execute(command)
50+
def execute(self, command, commit=False, args=None):
51+
if args:
52+
self.cursor.execute(command, args)
53+
else:
54+
self.cursor.execute(command)
5255
if commit:
5356
self.db.commit()
5457

mysql_ch_replicator/pymysqlreplication/row_event.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,8 @@ def __read_string(self, size, column):
332332
else:
333333
# MYSQL 5.xx Version Goes Here
334334
# We don't know encoding type So apply Default Utf-8
335-
string = string.decode(errors=decode_errors)
335+
#string = string.decode(errors=decode_errors)
336+
pass # decode it later
336337
return string
337338

338339
def __read_bit(self, column):

mysql_ch_replicator/table_structure.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class TableStructure:
1212
primary_keys: str = ''
1313
primary_key_ids: int = 0
1414
table_name: str = ''
15+
charset: str = ''
1516

1617
def preprocess(self):
1718
field_names = [f.name for f in self.fields]

test_mysql_ch_replicator.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,10 @@ def test_runner():
318318
KEY `IDX_age` (`age`),
319319
FULLTEXT KEY `IDX_name` (`name`),
320320
PRIMARY KEY (id)
321-
);
321+
) ENGINE=InnoDB AUTO_INCREMENT=2478808 DEFAULT CHARSET=latin1;
322322
''')
323323

324+
324325
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True)
325326
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True)
326327

@@ -367,6 +368,16 @@ def test_runner():
367368

368369
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)
369370

371+
mysql.execute(
372+
command=f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES (%s, %s);",
373+
args=(b'H\xe4llo'.decode('latin-1'), 1912),
374+
commit=True,
375+
)
376+
377+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5)
378+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo')
379+
380+
370381
mysql.create_database(TEST_DB_NAME_2)
371382
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())
372383

0 commit comments

Comments
 (0)