Skip to content

Commit 9af781b

Browse files
committed
Fixed tests
1 parent 17c7318 commit 9af781b

File tree

3 files changed

+52
-42
lines changed

3 files changed

+52
-42
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@
1616
(
1717
{fields},
1818
`_version` UInt64,
19-
INDEX _version _version TYPE minmax GRANULARITY 1,
20-
INDEX idx_id {primary_key} TYPE bloom_filter GRANULARITY 1
19+
{indexes}
2120
)
2221
ENGINE = ReplacingMergeTree(_version)
2322
{partition_by}ORDER BY {primary_key}
2423
SETTINGS index_granularity = 8192
2524
'''
2625

2726
DELETE_QUERY = '''
28-
DELETE FROM {db_name}.{table_name} WHERE {field_name} IN ({field_values})
27+
DELETE FROM {db_name}.{table_name} WHERE ({field_name}) IN ({field_values})
2928
'''
3029

3130

@@ -63,8 +62,6 @@ def get_databases(self):
6362
return database_list
6463

6564
def execute_command(self, query):
66-
#print(' === executing ch query', query)
67-
6865
for attempt in range(ClickhouseApi.MAX_RETRIES):
6966
try:
7067
self.client.command(query)
@@ -76,7 +73,6 @@ def execute_command(self, query):
7673
time.sleep(ClickhouseApi.RETRY_INTERVAL)
7774

7875
def recreate_database(self):
79-
#print(' === creating database', self.database)
8076
self.execute_command(f'DROP DATABASE IF EXISTS {self.database}')
8177
self.execute_command(f'CREATE DATABASE {self.database}')
8278

@@ -90,28 +86,36 @@ def create_table(self, structure: TableStructure):
9086
if not structure.primary_keys:
9187
raise Exception(f'missing primary key for {structure.table_name}')
9288

93-
primary_key_type = ''
94-
for field in structure.fields:
95-
if field.name == structure.primary_key:
96-
primary_key_type = field.field_type
97-
if not primary_key_type:
98-
raise Exception(f'failed to get type of primary key {structure.table_name} {structure.primary_key}')
99-
10089
fields = [
10190
f' `{field.name}` {field.field_type}' for field in structure.fields
10291
]
10392
fields = ',\n'.join(fields)
10493
partition_by = ''
10594

106-
if 'int' in primary_key_type.lower():
107-
partition_by = f'PARTITION BY intDiv({structure.primary_key}, 4294967)\n'
95+
if len(structure.primary_keys) == 1:
96+
if 'int' in structure.fields[structure.primary_key_ids[0]].field_type.lower():
97+
partition_by = f'PARTITION BY intDiv({structure.primary_keys[0]}, 4294967)\n'
98+
99+
indexes = [
100+
'INDEX _version _version TYPE minmax GRANULARITY 1',
101+
]
102+
if len(structure.primary_keys) == 1:
103+
indexes.append(
104+
f'INDEX idx_id {structure.primary_keys[0]} TYPE bloom_filter GRANULARITY 1',
105+
)
106+
107+
indexes = ',\n'.join(indexes)
108+
primary_key = ','.join(structure.primary_keys)
109+
if len(structure.primary_keys) > 1:
110+
primary_key = f'({primary_key})'
108111

109112
query = CREATE_TABLE_QUERY.format(**{
110113
'db_name': self.database,
111114
'table_name': structure.table_name,
112115
'fields': fields,
113-
'primary_key': structure.primary_key,
116+
'primary_key': primary_key,
114117
'partition_by': partition_by,
118+
'indexes': indexes,
115119
})
116120
self.execute_command(query)
117121

@@ -161,6 +165,7 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
161165
self.set_last_used_version(table_name, current_version)
162166

163167
def erase(self, table_name, field_name, field_values):
168+
field_name = ','.join(field_name)
164169
field_values = ', '.join(list(map(str, field_values)))
165170
query = DELETE_QUERY.format(**{
166171
'db_name': self.database,

mysql_ch_replicator/db_replicator.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,11 @@ def perform_initial_replication_table(self, table_name):
277277
logger.debug(f'mysql table structure: {mysql_table_structure}')
278278
logger.debug(f'clickhouse table structure: {clickhouse_table_structure}')
279279

280-
field_names = [field.name for field in clickhouse_table_structure.fields]
281280
field_types = [field.field_type for field in clickhouse_table_structure.fields]
282281

283-
primary_key = clickhouse_table_structure.primary_keys
282+
primary_keys = clickhouse_table_structure.primary_keys
284283
primary_key_ids = clickhouse_table_structure.primary_key_ids
285-
286-
#primary_key_type = field_types[primary_key_index]
284+
primary_key_types = [field_types[key_idx] for key_idx in primary_key_ids]
287285

288286
#logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}')
289287

@@ -292,15 +290,20 @@ def perform_initial_replication_table(self, table_name):
292290

293291
while True:
294292

295-
query_start_value = max_primary_key
296-
if 'int' not in primary_key_type.lower() and query_start_value is not None:
297-
query_start_value = f"'{query_start_value}'"
293+
query_start_values = max_primary_key
294+
if query_start_values is not None:
295+
for i in range(len(query_start_values)):
296+
key_type = primary_key_types[i]
297+
value = query_start_values[i]
298+
if 'int' not in key_type.lower():
299+
value = f"'{value}'"
300+
query_start_values[i] = value
298301

299302
records = self.mysql_api.get_records(
300303
table_name=table_name,
301-
order_by=primary_key,
304+
order_by=primary_keys,
302305
limit=DbReplicator.INITIAL_REPLICATION_BATCH_SIZE,
303-
start_value=query_start_value,
306+
start_value=query_start_values,
304307
)
305308
logger.debug(f'extracted {len(records)} records from mysql')
306309

@@ -313,7 +316,7 @@ def perform_initial_replication_table(self, table_name):
313316
break
314317
self.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure)
315318
for record in records:
316-
record_primary_key = record[primary_key_index]
319+
record_primary_key = [record[key_idx] for key_idx in primary_key_ids]
317320
if max_primary_key is None:
318321
max_primary_key = record_primary_key
319322
else:
@@ -406,6 +409,16 @@ def save_state_if_required(self, force=False):
406409
self.state.tables_last_record_version = self.clickhouse_api.tables_last_record_version
407410
self.state.save()
408411

412+
def _get_record_id(self, ch_table_structure, record: list):
413+
result = []
414+
for idx in ch_table_structure.primary_key_ids:
415+
field_type = ch_table_structure.fields[idx].field_type
416+
if field_type == 'String':
417+
result.append(f"'{record[idx]}'")
418+
else:
419+
result.append(record[idx])
420+
return ','.join(map(str, result))
421+
409422
def handle_insert_event(self, event: LogEvent):
410423
if self.config.debug_log_level:
411424
logger.debug(
@@ -420,12 +433,10 @@ def handle_insert_event(self, event: LogEvent):
420433
clickhouse_table_structure = self.state.tables_structure[event.table_name][1]
421434
records = self.converter.convert_records(event.records, mysql_table_structure, clickhouse_table_structure)
422435

423-
primary_key_ids = mysql_table_structure.primary_key_idx
424-
425436
current_table_records_to_insert = self.records_to_insert[event.table_name]
426437
current_table_records_to_delete = self.records_to_delete[event.table_name]
427438
for record in records:
428-
record_id = record[primary_key_ids]
439+
record_id = self._get_record_id(clickhouse_table_structure, record)
429440
current_table_records_to_insert[record_id] = record
430441
current_table_records_to_delete.discard(record_id)
431442

@@ -439,16 +450,9 @@ def handle_erase_event(self, event: LogEvent):
439450
self.stats.erase_events_count += 1
440451
self.stats.erase_records_count += len(event.records)
441452

442-
table_structure: TableStructure = self.state.tables_structure[event.table_name][0]
443453
table_structure_ch: TableStructure = self.state.tables_structure[event.table_name][1]
444454

445-
primary_key_name_idx = table_structure.primary_key_idx
446-
field_type_ch = table_structure_ch.fields[primary_key_name_idx].field_type
447-
448-
if field_type_ch == 'String':
449-
keys_to_remove = [f"'{record[primary_key_name_idx]}'" for record in event.records]
450-
else:
451-
keys_to_remove = [record[primary_key_name_idx] for record in event.records]
455+
keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in event.records]
452456

453457
current_table_records_to_insert = self.records_to_insert[event.table_name]
454458
current_table_records_to_delete = self.records_to_delete[event.table_name]
@@ -548,12 +552,12 @@ def upload_records(self):
548552
if not keys_to_remove:
549553
continue
550554
table_structure: TableStructure = self.state.tables_structure[table_name][0]
551-
primary_key_name = table_structure.primary_key
555+
primary_key_names = table_structure.primary_keys
552556
if self.config.debug_log_level:
553-
logger.debug(f'erasing from {table_name}, primary key: {primary_key_name}, values: {keys_to_remove}')
557+
logger.debug(f'erasing from {table_name}, primary key: {primary_key_names}, values: {keys_to_remove}')
554558
self.clickhouse_api.erase(
555559
table_name=table_name,
556-
field_name=primary_key_name,
560+
field_name=primary_key_names,
557561
field_values=keys_to_remove,
558562
)
559563

mysql_ch_replicator/mysql_api.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def create_database(self, db_name):
4848
self.cursor.execute(f'CREATE DATABASE {db_name}')
4949

5050
def execute(self, command, commit=False):
51-
#print(f'Executing: <{command}>')
5251
self.cursor.execute(command)
5352
if commit:
5453
self.db.commit()
@@ -88,9 +87,11 @@ def get_table_create_statement(self, table_name) -> str:
8887

8988
def get_records(self, table_name, order_by, limit, start_value=None):
9089
self.reconnect_if_required()
90+
order_by = ','.join(order_by)
9191
where = ''
9292
if start_value is not None:
93-
where = f'WHERE {order_by} > {start_value} '
93+
start_value = ','.join(map(str, start_value))
94+
where = f'WHERE ({order_by}) > ({start_value}) '
9495
query = f'SELECT * FROM {table_name} {where}ORDER BY {order_by} LIMIT {limit}'
9596
self.cursor.execute(query)
9697
res = self.cursor.fetchall()

0 commit comments

Comments
 (0)