Skip to content

Commit 7e795d7

Browse files
authored
Support for indexes (#38)
1 parent 82229ed commit 7e795d7

File tree

7 files changed

+69
-5
lines changed

7 files changed

+69
-5
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,20 @@ binlog_replicator:
134134
databases: 'database_name_pattern_*'
135135
tables: '*'
136136

137+
138+
# OPTIONAL SETTINGS
139+
137140
exclude_databases: ['database_10', 'database_*_42'] # optional
138141
exclude_tables: ['meta_table_*'] # optional
139142

140143
log_level: 'info' # optional
141144
optimize_interval: 86400 # optional
145+
146+
indexes: # optional
147+
- databases: '*'
148+
tables: ['test_table']
149+
index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'
150+
142151
```
143152

144153
#### Required settings
@@ -154,6 +163,7 @@ optimize_interval: 86400 # optional
154163
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
155164
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
156165
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
166+
- `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch.
157167

158168
Few more tables / dbs examples:
159169

mysql_ch_replicator/clickhouse_api.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def get_last_used_version(self, table_name):
8282
def set_last_used_version(self, table_name, last_used_version):
8383
self.tables_last_record_version[table_name] = last_used_version
8484

85-
def create_table(self, structure: TableStructure):
85+
def create_table(self, structure: TableStructure, additional_indexes: list | None = None):
8686
if not structure.primary_keys:
8787
raise Exception(f'missing primary key for {structure.table_name}')
8888

@@ -103,6 +103,8 @@ def create_table(self, structure: TableStructure):
103103
indexes.append(
104104
f'INDEX idx_id {structure.primary_keys[0]} TYPE bloom_filter GRANULARITY 1',
105105
)
106+
if additional_indexes is not None:
107+
indexes += additional_indexes
106108

107109
indexes = ',\n'.join(indexes)
108110
primary_key = ','.join(structure.primary_keys)
@@ -117,6 +119,7 @@ def create_table(self, structure: TableStructure):
117119
'partition_by': partition_by,
118120
'indexes': indexes,
119121
})
122+
print(" === query:", query)
120123
self.execute_command(query)
121124

122125
def insert(self, table_name, records, table_structure: TableStructure = None):
@@ -196,6 +199,12 @@ def select(self, table_name, where=None, final=None):
196199
results.append(dict(zip(columns, row)))
197200
return results
198201

202+
def query(self, query: str):
203+
return self.client.query(query)
204+
205+
def show_create_table(self, table_name):
206+
return self.client.query(f'SHOW CREATE TABLE {table_name}').result_rows[0][0]
207+
199208
def get_system_setting(self, name):
200209
results = self.select('system.settings', f"name = '{name}'")
201210
if not results:

mysql_ch_replicator/config.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ def validate(self):
2929
raise ValueError(f'mysql password should be string and not {stype(self.password)}')
3030

3131

32+
@dataclass
33+
class Index:
34+
databases: str | list = '*'
35+
tables: str | list = '*'
36+
index: str = ''
37+
38+
3239
@dataclass
3340
class ClickhouseSettings:
3441
host: str = 'localhost'
@@ -98,6 +105,7 @@ def __init__(self):
98105
self.debug_log_level = False
99106
self.optimize_interval = 0
100107
self.check_db_updated_interval = 0
108+
self.indexes: list[Index] = []
101109

102110
def load(self, settings_file):
103111
data = open(settings_file, 'r').read()
@@ -115,6 +123,11 @@ def load(self, settings_file):
115123
self.check_db_updated_interval = data.pop(
116124
'check_db_updated_interval', Settings.DEFAULT_CHECK_DB_UPDATED_INTERVAL,
117125
)
126+
indexes = data.pop('indexes', [])
127+
for index in indexes:
128+
self.indexes.append(
129+
Index(**index)
130+
)
118131
assert isinstance(self.databases, str) or isinstance(self.databases, list)
119132
assert isinstance(self.tables, str) or isinstance(self.tables, list)
120133
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))
@@ -151,6 +164,16 @@ def validate_log_level(self):
151164
if self.log_level == 'debug':
152165
self.debug_log_level = True
153166

167+
def get_indexes(self, db_name, table_name):
168+
results = []
169+
for index in self.indexes:
170+
if not self.is_pattern_matches(db_name, index.databases):
171+
continue
172+
if not self.is_pattern_matches(table_name, index.tables):
173+
continue
174+
results.append(index.index)
175+
return results
176+
154177
def validate(self):
155178
self.mysql.validate()
156179
self.clickhouse.validate()

mysql_ch_replicator/converter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
472472
query = f'ALTER TABLE {db_name}.{table_name} RENAME COLUMN {column_name} TO {new_column_name}'
473473
self.db_replicator.clickhouse_api.execute_command(query)
474474

475-
def parse_create_table_query(self, mysql_query) -> tuple:
475+
def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]:
476476
mysql_table_structure = self.parse_mysql_table_structure(mysql_query)
477477
ch_table_structure = self.convert_table_structure(mysql_table_structure)
478478
return mysql_table_structure, ch_table_structure

mysql_ch_replicator/db_replicator.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ def create_initial_structure_table(self, table_name):
214214
self.validate_mysql_structure(mysql_structure)
215215
clickhouse_structure = self.converter.convert_table_structure(mysql_structure)
216216
self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
217-
self.clickhouse_api.create_table(clickhouse_structure)
217+
indexes = self.config.get_indexes(self.database, table_name)
218+
self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)
218219

219220
def prevent_binlog_removal(self):
220221
if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL:
@@ -480,7 +481,8 @@ def handle_create_table_query(self, query, db_name):
480481
if not self.config.is_table_matches(mysql_structure.table_name):
481482
return
482483
self.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
483-
self.clickhouse_api.create_table(ch_structure)
484+
indexes = self.config.get_indexes(self.database, ch_structure.table_name)
485+
self.clickhouse_api.create_table(ch_structure, additional_indexes=indexes)
484486

485487
def handle_drop_table_query(self, query, db_name):
486488
tokens = query.split()

test_mysql_ch_replicator.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,22 @@ def test_runner():
366366
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)
367367

368368
mysql.create_database(TEST_DB_NAME_2)
369-
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases(), max_wait_time=5)
369+
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())
370+
371+
mysql.execute(f'''
372+
CREATE TABLE test_table_with_index (
373+
id int NOT NULL AUTO_INCREMENT,
374+
name varchar(255) NOT NULL,
375+
age int,
376+
rate decimal(10,4),
377+
PRIMARY KEY (id)
378+
);
379+
''')
380+
381+
assert_wait(lambda: 'test_table_with_index' in ch.get_tables())
382+
383+
create_query = ch.show_create_table('test_table_with_index')
384+
assert 'INDEX name_idx name TYPE ngrambf_v1' in create_query
370385

371386
run_all_runner.stop()
372387

tests_config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,8 @@ databases: '*test*'
1919
log_level: 'debug'
2020
optimize_interval: 3
2121
check_db_updated_interval: 3
22+
23+
indexes:
24+
- databases: '*'
25+
tables: ['test_table_with_index']
26+
index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'

0 commit comments

Comments
 (0)