Skip to content

Commit a6a0161

Browse files
committed
started implementing multi-primary-key
1 parent ac40e0c commit a6a0161

File tree

5 files changed

+48
-27
lines changed

5 files changed

+48
-27
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def set_last_used_version(self, table_name, last_used_version):
8686
self.tables_last_record_version[table_name] = last_used_version
8787

8888
def create_table(self, structure: TableStructure):
89-
if not structure.primary_key:
89+
if not structure.primary_keys:
9090
raise Exception(f'missing primary key for {structure.table_name}')
9191

9292
primary_key_type = ''

mysql_ch_replicator/converter.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import sqlparse
44
import re
5-
from pyparsing import Word, alphas, alphanums
5+
from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList
66

77
from .table_structure import TableStructure, TableField
88

@@ -216,7 +216,7 @@ def convert_table_structure(self, mysql_structure: TableStructure) -> TableStruc
216216
name=field.name,
217217
field_type=clickhouse_field_type,
218218
))
219-
clickhouse_structure.primary_key = mysql_structure.primary_key
219+
clickhouse_structure.primary_keys = mysql_structure.primary_keys
220220
clickhouse_structure.preprocess()
221221
return clickhouse_structure
222222

@@ -519,9 +519,26 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
519519
if line.lower().startswith('constraint'):
520520
continue
521521
if line.lower().startswith('primary key'):
522-
pattern = 'PRIMARY KEY (' + Word(alphanums + '_`') + ')'
522+
# pattern = 'PRIMARY KEY (' + Word(alphanums + '_`') + ')'
523+
# result = pattern.parseString(line)
524+
# structure.primary_key = strip_sql_name(result[1])
525+
526+
# Define identifier to match column names, handling backticks and unquoted names
527+
identifier = (Suppress('`') + Word(alphas + alphanums + '_') + Suppress('`')) | Word(
528+
alphas + alphanums + '_')
529+
530+
# Build the parsing pattern
531+
pattern = CaselessKeyword('PRIMARY') + CaselessKeyword('KEY') + Suppress('(') + delimitedList(
532+
identifier)('column_names') + Suppress(')')
533+
534+
# Parse the line
523535
result = pattern.parseString(line)
524-
structure.primary_key = strip_sql_name(result[1])
536+
537+
# Extract and process the primary key column names
538+
primary_keys = [strip_sql_name(name) for name in result['column_names']]
539+
540+
structure.primary_keys = primary_keys
541+
525542
continue
526543

527544
#print(" === processing line", line)
@@ -541,16 +558,16 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
541558
#print(' ---- params:', field_parameters)
542559

543560

544-
if not structure.primary_key:
561+
if not structure.primary_keys:
545562
for field in structure.fields:
546563
if 'primary key' in field.parameters.lower():
547-
structure.primary_key = field.name
564+
structure.primary_keys.append(field.name)
548565

549-
if not structure.primary_key:
566+
if not structure.primary_keys:
550567
if structure.has_field('id'):
551-
structure.primary_key = 'id'
568+
structure.primary_keys = ['id']
552569

553-
if not structure.primary_key:
570+
if not structure.primary_keys:
554571
raise Exception(f'No primary key for table {structure.table_name}, {create_statement}')
555572

556573
structure.preprocess()

mysql_ch_replicator/db_replicator.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,16 @@ def validate_database_settings(self):
148148
)
149149

150150
def validate_mysql_structure(self, mysql_structure: TableStructure):
151-
primary_field: TableField = mysql_structure.fields[mysql_structure.primary_key_idx]
152-
if 'not null' not in primary_field.parameters.lower():
153-
logger.warning('primary key validation failed')
154-
logger.warning(
155-
f'\n\n\n !!! WARNING - PRIMARY KEY NULLABLE (field "{primary_field.name}", table "{mysql_structure.table_name}") !!!\n\n'
156-
'There could be errors replicating nullable primary key\n'
157-
'Please ensure all tables has NOT NULL parameter for primary key\n'
158-
'Or mark tables as skipped, see "exclude_tables" option\n\n\n'
159-
)
151+
pass
152+
# primary_field: TableField = mysql_structure.fields[mysql_structure.primary_key_idx]
153+
# if 'not null' not in primary_field.parameters.lower():
154+
# logger.warning('primary key validation failed')
155+
# logger.warning(
156+
# f'\n\n\n !!! WARNING - PRIMARY KEY NULLABLE (field "{primary_field.name}", table "{mysql_structure.table_name}") !!!\n\n'
157+
# 'There could be errors replicating nullable primary key\n'
158+
# 'Please ensure all tables has NOT NULL parameter for primary key\n'
159+
# 'Or mark tables as skipped, see "exclude_tables" option\n\n\n'
160+
# )
160161

161162
def run(self):
162163
try:
@@ -279,11 +280,12 @@ def perform_initial_replication_table(self, table_name):
279280
field_names = [field.name for field in clickhouse_table_structure.fields]
280281
field_types = [field.field_type for field in clickhouse_table_structure.fields]
281282

282-
primary_key = clickhouse_table_structure.primary_key
283-
primary_key_index = field_names.index(primary_key)
284-
primary_key_type = field_types[primary_key_index]
283+
primary_key = clickhouse_table_structure.primary_keys
284+
primary_key_ids = clickhouse_table_structure.primary_key_ids
285+
286+
#primary_key_type = field_types[primary_key_index]
285287

286-
logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}')
288+
#logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}')
287289

288290
stats_number_of_records = 0
289291
last_stats_dump_time = time.time()

mysql_ch_replicator/table_structure.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ class TableField:
99
@dataclass
1010
class TableStructure:
1111
fields: list = field(default_factory=list)
12-
primary_key: str = ''
13-
primary_key_idx: int = 0
12+
primary_keys: str = ''
13+
primary_key_ids: int = 0
1414
table_name: str = ''
1515

1616
def preprocess(self):
1717
field_names = [f.name for f in self.fields]
18-
self.primary_key_idx = field_names.index(self.primary_key)
18+
self.primary_key_ids = [
19+
field_names.index(key) for key in self.primary_keys
20+
]
1921

2022
def add_field_after(self, new_field: TableField, after: str):
2123

test_mysql_ch_replicator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def test_e2e_multistatement():
226226
id int NOT NULL AUTO_INCREMENT,
227227
name varchar(255),
228228
age int,
229-
PRIMARY KEY (id)
229+
PRIMARY KEY (id, `name`)
230230
);
231231
''')
232232

0 commit comments

Comments
 (0)