Skip to content

Commit 6365506

Browse files
authored
Fixed tables with alternative encoding (bakwc#44)
1 parent fcb402f commit 6365506

File tree

6 files changed

+59
-8
lines changed

6 files changed

+59
-8
lines changed

mysql_ch_replicator/converter.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,19 +222,31 @@ def convert_table_structure(self, mysql_structure: TableStructure) -> TableStruc
222222
clickhouse_structure.preprocess()
223223
return clickhouse_structure
224224

225-
def convert_records(self, mysql_records, mysql_structure: TableStructure, clickhouse_structure: TableStructure):
225+
def convert_records(
226+
self, mysql_records, mysql_structure: TableStructure, clickhouse_structure: TableStructure,
227+
only_primary: bool = False,
228+
):
226229
mysql_field_types = [field.field_type for field in mysql_structure.fields]
227230
clickhouse_filed_types = [field.field_type for field in clickhouse_structure.fields]
228231

229232
clickhouse_records = []
230233
for mysql_record in mysql_records:
231-
clickhouse_record = self.convert_record(mysql_record, mysql_field_types, clickhouse_filed_types)
234+
clickhouse_record = self.convert_record(
235+
mysql_record, mysql_field_types, clickhouse_filed_types, mysql_structure, only_primary,
236+
)
232237
clickhouse_records.append(clickhouse_record)
233238
return clickhouse_records
234239

235-
def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types):
240+
def convert_record(
241+
self, mysql_record, mysql_field_types, clickhouse_field_types, mysql_structure: TableStructure,
242+
only_primary: bool,
243+
):
236244
clickhouse_record = []
237245
for idx, mysql_field_value in enumerate(mysql_record):
246+
if only_primary and idx not in mysql_structure.primary_key_ids:
247+
clickhouse_record.append(mysql_field_value)
248+
continue
249+
238250
clickhouse_field_value = mysql_field_value
239251
mysql_field_type = mysql_field_types[idx]
240252
clickhouse_field_type = clickhouse_field_types[idx]
@@ -256,6 +268,13 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types
256268
if 'UInt64' in clickhouse_field_type and clickhouse_field_value < 0:
257269
clickhouse_field_value = 18446744073709551616 + clickhouse_field_value
258270

271+
if 'String' in clickhouse_field_type and (
272+
'text' in mysql_field_type or 'char' in mysql_field_type
273+
):
274+
if isinstance(clickhouse_field_value, bytes):
275+
charset = mysql_structure.charset or 'utf-8'
276+
clickhouse_field_value = clickhouse_field_value.decode(charset)
277+
259278
if 'point' in mysql_field_type:
260279
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)
261280

@@ -513,6 +532,18 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
513532
inner_tokens = ''.join([str(t) for t in inner_tokens[1:-1]]).strip()
514533
inner_tokens = split_high_level(inner_tokens, ',')
515534

535+
prev_token = ''
536+
prev_prev_token = ''
537+
for line in tokens[4:]:
538+
curr_token = line.value
539+
if prev_token == '=' and prev_prev_token.lower() == 'charset':
540+
structure.charset = curr_token
541+
prev_prev_token = prev_token
542+
prev_token = curr_token
543+
544+
if structure.charset.startswith('utf8'):
545+
structure.charset = 'utf-8'
546+
516547
for line in inner_tokens:
517548
if line.lower().startswith('unique key'):
518549
continue

mysql_ch_replicator/db_replicator.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,12 @@ def handle_erase_event(self, event: LogEvent):
459459
self.stats.erase_records_count += len(event.records)
460460

461461
table_structure_ch: TableStructure = self.state.tables_structure[event.table_name][1]
462+
table_structure_mysql: TableStructure = self.state.tables_structure[event.table_name][0]
462463

463-
keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in event.records]
464+
records = self.converter.convert_records(
465+
event.records, table_structure_mysql, table_structure_ch, only_primary=True,
466+
)
467+
keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in records]
464468

465469
current_table_records_to_insert = self.records_to_insert[event.table_name]
466470
current_table_records_to_delete = self.records_to_delete[event.table_name]

mysql_ch_replicator/mysql_api.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,11 @@ def drop_database(self, db_name):
4747
def create_database(self, db_name):
4848
self.cursor.execute(f'CREATE DATABASE {db_name}')
4949

50-
def execute(self, command, commit=False):
51-
self.cursor.execute(command)
50+
def execute(self, command, commit=False, args=None):
51+
if args:
52+
self.cursor.execute(command, args)
53+
else:
54+
self.cursor.execute(command)
5255
if commit:
5356
self.db.commit()
5457

mysql_ch_replicator/pymysqlreplication/row_event.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,8 @@ def __read_string(self, size, column):
332332
else:
333333
# MYSQL 5.xx Version Goes Here
334334
# We don't know encoding type So apply Default Utf-8
335-
string = string.decode(errors=decode_errors)
335+
#string = string.decode(errors=decode_errors)
336+
pass # decode it later
336337
return string
337338

338339
def __read_bit(self, column):

mysql_ch_replicator/table_structure.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class TableStructure:
1212
primary_keys: str = ''
1313
primary_key_ids: int = 0
1414
table_name: str = ''
15+
charset: str = ''
1516

1617
def preprocess(self):
1718
field_names = [f.name for f in self.fields]

test_mysql_ch_replicator.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,10 @@ def test_runner():
318318
KEY `IDX_age` (`age`),
319319
FULLTEXT KEY `IDX_name` (`name`),
320320
PRIMARY KEY (id)
321-
);
321+
) ENGINE=InnoDB AUTO_INCREMENT=2478808 DEFAULT CHARSET=latin1;
322322
''')
323323

324+
324325
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True)
325326
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True)
326327

@@ -367,6 +368,16 @@ def test_runner():
367368

368369
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)
369370

371+
mysql.execute(
372+
command=f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES (%s, %s);",
373+
args=(b'H\xe4llo'.decode('latin-1'), 1912),
374+
commit=True,
375+
)
376+
377+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5)
378+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo')
379+
380+
370381
mysql.create_database(TEST_DB_NAME_2)
371382
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())
372383

0 commit comments

Comments
 (0)