Skip to content

Commit f13a11a

Browse files
authored
Support for compound primary key (bakwc#34)
1 parent 3299161 commit f13a11a

File tree

6 files changed

+96
-66
lines changed

6 files changed

+96
-66
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@
1616
(
1717
{fields},
1818
`_version` UInt64,
19-
INDEX _version _version TYPE minmax GRANULARITY 1,
20-
INDEX idx_id {primary_key} TYPE bloom_filter GRANULARITY 1
19+
{indexes}
2120
)
2221
ENGINE = ReplacingMergeTree(_version)
2322
{partition_by}ORDER BY {primary_key}
2423
SETTINGS index_granularity = 8192
2524
'''
2625

2726
DELETE_QUERY = '''
28-
DELETE FROM {db_name}.{table_name} WHERE {field_name} IN ({field_values})
27+
DELETE FROM {db_name}.{table_name} WHERE ({field_name}) IN ({field_values})
2928
'''
3029

3130

@@ -63,8 +62,6 @@ def get_databases(self):
6362
return database_list
6463

6564
def execute_command(self, query):
66-
#print(' === executing ch query', query)
67-
6865
for attempt in range(ClickhouseApi.MAX_RETRIES):
6966
try:
7067
self.client.command(query)
@@ -76,7 +73,6 @@ def execute_command(self, query):
7673
time.sleep(ClickhouseApi.RETRY_INTERVAL)
7774

7875
def recreate_database(self):
79-
#print(' === creating database', self.database)
8076
self.execute_command(f'DROP DATABASE IF EXISTS {self.database}')
8177
self.execute_command(f'CREATE DATABASE {self.database}')
8278

@@ -87,31 +83,39 @@ def set_last_used_version(self, table_name, last_used_version):
8783
self.tables_last_record_version[table_name] = last_used_version
8884

8985
def create_table(self, structure: TableStructure):
90-
if not structure.primary_key:
86+
if not structure.primary_keys:
9187
raise Exception(f'missing primary key for {structure.table_name}')
9288

93-
primary_key_type = ''
94-
for field in structure.fields:
95-
if field.name == structure.primary_key:
96-
primary_key_type = field.field_type
97-
if not primary_key_type:
98-
raise Exception(f'failed to get type of primary key {structure.table_name} {structure.primary_key}')
99-
10089
fields = [
10190
f' `{field.name}` {field.field_type}' for field in structure.fields
10291
]
10392
fields = ',\n'.join(fields)
10493
partition_by = ''
10594

106-
if 'int' in primary_key_type.lower():
107-
partition_by = f'PARTITION BY intDiv({structure.primary_key}, 4294967)\n'
95+
if len(structure.primary_keys) == 1:
96+
if 'int' in structure.fields[structure.primary_key_ids[0]].field_type.lower():
97+
partition_by = f'PARTITION BY intDiv({structure.primary_keys[0]}, 4294967)\n'
98+
99+
indexes = [
100+
'INDEX _version _version TYPE minmax GRANULARITY 1',
101+
]
102+
if len(structure.primary_keys) == 1:
103+
indexes.append(
104+
f'INDEX idx_id {structure.primary_keys[0]} TYPE bloom_filter GRANULARITY 1',
105+
)
106+
107+
indexes = ',\n'.join(indexes)
108+
primary_key = ','.join(structure.primary_keys)
109+
if len(structure.primary_keys) > 1:
110+
primary_key = f'({primary_key})'
108111

109112
query = CREATE_TABLE_QUERY.format(**{
110113
'db_name': self.database,
111114
'table_name': structure.table_name,
112115
'fields': fields,
113-
'primary_key': structure.primary_key,
116+
'primary_key': primary_key,
114117
'partition_by': partition_by,
118+
'indexes': indexes,
115119
})
116120
self.execute_command(query)
117121

@@ -161,6 +165,7 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
161165
self.set_last_used_version(table_name, current_version)
162166

163167
def erase(self, table_name, field_name, field_values):
168+
field_name = ','.join(field_name)
164169
field_values = ', '.join(list(map(str, field_values)))
165170
query = DELETE_QUERY.format(**{
166171
'db_name': self.database,

mysql_ch_replicator/converter.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import sqlparse
44
import re
5-
from pyparsing import Word, alphas, alphanums
5+
from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList
66

77
from .table_structure import TableStructure, TableField
88

@@ -218,7 +218,7 @@ def convert_table_structure(self, mysql_structure: TableStructure) -> TableStruc
218218
name=field.name,
219219
field_type=clickhouse_field_type,
220220
))
221-
clickhouse_structure.primary_key = mysql_structure.primary_key
221+
clickhouse_structure.primary_keys = mysql_structure.primary_keys
222222
clickhouse_structure.preprocess()
223223
return clickhouse_structure
224224

@@ -521,9 +521,22 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
521521
if line.lower().startswith('constraint'):
522522
continue
523523
if line.lower().startswith('primary key'):
524-
pattern = 'PRIMARY KEY (' + Word(alphanums + '_`') + ')'
524+
# Define identifier to match column names, handling backticks and unquoted names
525+
identifier = (Suppress('`') + Word(alphas + alphanums + '_') + Suppress('`')) | Word(
526+
alphas + alphanums + '_')
527+
528+
# Build the parsing pattern
529+
pattern = CaselessKeyword('PRIMARY') + CaselessKeyword('KEY') + Suppress('(') + delimitedList(
530+
identifier)('column_names') + Suppress(')')
531+
532+
# Parse the line
525533
result = pattern.parseString(line)
526-
structure.primary_key = strip_sql_name(result[1])
534+
535+
# Extract and process the primary key column names
536+
primary_keys = [strip_sql_name(name) for name in result['column_names']]
537+
538+
structure.primary_keys = primary_keys
539+
527540
continue
528541

529542
#print(" === processing line", line)
@@ -543,16 +556,16 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
543556
#print(' ---- params:', field_parameters)
544557

545558

546-
if not structure.primary_key:
559+
if not structure.primary_keys:
547560
for field in structure.fields:
548561
if 'primary key' in field.parameters.lower():
549-
structure.primary_key = field.name
562+
structure.primary_keys.append(field.name)
550563

551-
if not structure.primary_key:
564+
if not structure.primary_keys:
552565
if structure.has_field('id'):
553-
structure.primary_key = 'id'
566+
structure.primary_keys = ['id']
554567

555-
if not structure.primary_key:
568+
if not structure.primary_keys:
556569
raise Exception(f'No primary key for table {structure.table_name}, {create_statement}')
557570

558571
structure.preprocess()

mysql_ch_replicator/db_replicator.py

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,16 @@ def validate_database_settings(self):
148148
)
149149

150150
def validate_mysql_structure(self, mysql_structure: TableStructure):
151-
primary_field: TableField = mysql_structure.fields[mysql_structure.primary_key_idx]
152-
if 'not null' not in primary_field.parameters.lower():
153-
logger.warning('primary key validation failed')
154-
logger.warning(
155-
f'\n\n\n !!! WARNING - PRIMARY KEY NULLABLE (field "{primary_field.name}", table "{mysql_structure.table_name}") !!!\n\n'
156-
'There could be errors replicating nullable primary key\n'
157-
'Please ensure all tables has NOT NULL parameter for primary key\n'
158-
'Or mark tables as skipped, see "exclude_tables" option\n\n\n'
159-
)
151+
for key_idx in mysql_structure.primary_key_ids:
152+
primary_field: TableField = mysql_structure.fields[key_idx]
153+
if 'not null' not in primary_field.parameters.lower():
154+
logger.warning('primary key validation failed')
155+
logger.warning(
156+
f'\n\n\n !!! WARNING - PRIMARY KEY NULLABLE (field "{primary_field.name}", table "{mysql_structure.table_name}") !!!\n\n'
157+
'There could be errors replicating nullable primary key\n'
158+
'Please ensure all tables has NOT NULL parameter for primary key\n'
159+
'Or mark tables as skipped, see "exclude_tables" option\n\n\n'
160+
)
160161

161162
def run(self):
162163
try:
@@ -276,29 +277,33 @@ def perform_initial_replication_table(self, table_name):
276277
logger.debug(f'mysql table structure: {mysql_table_structure}')
277278
logger.debug(f'clickhouse table structure: {clickhouse_table_structure}')
278279

279-
field_names = [field.name for field in clickhouse_table_structure.fields]
280280
field_types = [field.field_type for field in clickhouse_table_structure.fields]
281281

282-
primary_key = clickhouse_table_structure.primary_key
283-
primary_key_index = field_names.index(primary_key)
284-
primary_key_type = field_types[primary_key_index]
282+
primary_keys = clickhouse_table_structure.primary_keys
283+
primary_key_ids = clickhouse_table_structure.primary_key_ids
284+
primary_key_types = [field_types[key_idx] for key_idx in primary_key_ids]
285285

286-
logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}')
286+
#logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}')
287287

288288
stats_number_of_records = 0
289289
last_stats_dump_time = time.time()
290290

291291
while True:
292292

293-
query_start_value = max_primary_key
294-
if 'int' not in primary_key_type.lower() and query_start_value is not None:
295-
query_start_value = f"'{query_start_value}'"
293+
query_start_values = max_primary_key
294+
if query_start_values is not None:
295+
for i in range(len(query_start_values)):
296+
key_type = primary_key_types[i]
297+
value = query_start_values[i]
298+
if 'int' not in key_type.lower():
299+
value = f"'{value}'"
300+
query_start_values[i] = value
296301

297302
records = self.mysql_api.get_records(
298303
table_name=table_name,
299-
order_by=primary_key,
304+
order_by=primary_keys,
300305
limit=DbReplicator.INITIAL_REPLICATION_BATCH_SIZE,
301-
start_value=query_start_value,
306+
start_value=query_start_values,
302307
)
303308
logger.debug(f'extracted {len(records)} records from mysql')
304309

@@ -311,7 +316,7 @@ def perform_initial_replication_table(self, table_name):
311316
break
312317
self.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure)
313318
for record in records:
314-
record_primary_key = record[primary_key_index]
319+
record_primary_key = [record[key_idx] for key_idx in primary_key_ids]
315320
if max_primary_key is None:
316321
max_primary_key = record_primary_key
317322
else:
@@ -404,6 +409,16 @@ def save_state_if_required(self, force=False):
404409
self.state.tables_last_record_version = self.clickhouse_api.tables_last_record_version
405410
self.state.save()
406411

412+
def _get_record_id(self, ch_table_structure, record: list):
413+
result = []
414+
for idx in ch_table_structure.primary_key_ids:
415+
field_type = ch_table_structure.fields[idx].field_type
416+
if field_type == 'String':
417+
result.append(f"'{record[idx]}'")
418+
else:
419+
result.append(record[idx])
420+
return ','.join(map(str, result))
421+
407422
def handle_insert_event(self, event: LogEvent):
408423
if self.config.debug_log_level:
409424
logger.debug(
@@ -418,12 +433,10 @@ def handle_insert_event(self, event: LogEvent):
418433
clickhouse_table_structure = self.state.tables_structure[event.table_name][1]
419434
records = self.converter.convert_records(event.records, mysql_table_structure, clickhouse_table_structure)
420435

421-
primary_key_ids = mysql_table_structure.primary_key_idx
422-
423436
current_table_records_to_insert = self.records_to_insert[event.table_name]
424437
current_table_records_to_delete = self.records_to_delete[event.table_name]
425438
for record in records:
426-
record_id = record[primary_key_ids]
439+
record_id = self._get_record_id(clickhouse_table_structure, record)
427440
current_table_records_to_insert[record_id] = record
428441
current_table_records_to_delete.discard(record_id)
429442

@@ -437,16 +450,9 @@ def handle_erase_event(self, event: LogEvent):
437450
self.stats.erase_events_count += 1
438451
self.stats.erase_records_count += len(event.records)
439452

440-
table_structure: TableStructure = self.state.tables_structure[event.table_name][0]
441453
table_structure_ch: TableStructure = self.state.tables_structure[event.table_name][1]
442454

443-
primary_key_name_idx = table_structure.primary_key_idx
444-
field_type_ch = table_structure_ch.fields[primary_key_name_idx].field_type
445-
446-
if field_type_ch == 'String':
447-
keys_to_remove = [f"'{record[primary_key_name_idx]}'" for record in event.records]
448-
else:
449-
keys_to_remove = [record[primary_key_name_idx] for record in event.records]
455+
keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in event.records]
450456

451457
current_table_records_to_insert = self.records_to_insert[event.table_name]
452458
current_table_records_to_delete = self.records_to_delete[event.table_name]
@@ -546,12 +552,12 @@ def upload_records(self):
546552
if not keys_to_remove:
547553
continue
548554
table_structure: TableStructure = self.state.tables_structure[table_name][0]
549-
primary_key_name = table_structure.primary_key
555+
primary_key_names = table_structure.primary_keys
550556
if self.config.debug_log_level:
551-
logger.debug(f'erasing from {table_name}, primary key: {primary_key_name}, values: {keys_to_remove}')
557+
logger.debug(f'erasing from {table_name}, primary key: {primary_key_names}, values: {keys_to_remove}')
552558
self.clickhouse_api.erase(
553559
table_name=table_name,
554-
field_name=primary_key_name,
560+
field_name=primary_key_names,
555561
field_values=keys_to_remove,
556562
)
557563

mysql_ch_replicator/mysql_api.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def create_database(self, db_name):
4848
self.cursor.execute(f'CREATE DATABASE {db_name}')
4949

5050
def execute(self, command, commit=False):
51-
#print(f'Executing: <{command}>')
5251
self.cursor.execute(command)
5352
if commit:
5453
self.db.commit()
@@ -88,9 +87,11 @@ def get_table_create_statement(self, table_name) -> str:
8887

8988
def get_records(self, table_name, order_by, limit, start_value=None):
9089
self.reconnect_if_required()
90+
order_by = ','.join(order_by)
9191
where = ''
9292
if start_value is not None:
93-
where = f'WHERE {order_by} > {start_value} '
93+
start_value = ','.join(map(str, start_value))
94+
where = f'WHERE ({order_by}) > ({start_value}) '
9495
query = f'SELECT * FROM {table_name} {where}ORDER BY {order_by} LIMIT {limit}'
9596
self.cursor.execute(query)
9697
res = self.cursor.fetchall()

mysql_ch_replicator/table_structure.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ class TableField:
99
@dataclass
1010
class TableStructure:
1111
fields: list = field(default_factory=list)
12-
primary_key: str = ''
13-
primary_key_idx: int = 0
12+
primary_keys: str = ''
13+
primary_key_ids: int = 0
1414
table_name: str = ''
1515

1616
def preprocess(self):
1717
field_names = [f.name for f in self.fields]
18-
self.primary_key_idx = field_names.index(self.primary_key)
18+
self.primary_key_ids = [
19+
field_names.index(key) for key in self.primary_keys
20+
]
1921

2022
def add_field_after(self, new_field: TableField, after: str):
2123

test_mysql_ch_replicator.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ def test_e2e_multistatement():
227227
id int NOT NULL AUTO_INCREMENT,
228228
name varchar(255),
229229
age int,
230-
PRIMARY KEY (id)
230+
PRIMARY KEY (id, `name`)
231231
);
232232
''')
233233

@@ -259,6 +259,9 @@ def test_e2e_multistatement():
259259
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0].get('last_name') is None)
260260
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0].get('city') is None)
261261

262+
mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE name='Ivan';", commit=True)
263+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
264+
262265
mysql.execute(
263266
f"CREATE TABLE {TEST_TABLE_NAME_2} "
264267
f"(id int NOT NULL AUTO_INCREMENT, name varchar(255), age int, "

0 commit comments

Comments
 (0)