Skip to content

Commit 3727e3d

Browse files
authored
Add customizable PARTITION BY support for ClickHouse tables (#164)
* Add partition_bys config option similar to indexes with database/table filtering * Support custom PARTITION BY expressions to override default intDiv(id, 4294967) * Useful for time-based partitioning like toYYYYMM(created_at) for Snowflake IDs * Maintains backward compatibility with existing default behavior * Add test verification for custom partition_by functionality Fixes #161
1 parent 0a53255 commit 3727e3d

File tree

7 files changed

+57
-6
lines changed

7 files changed

+57
-6
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ indexes: # optional
230230
tables: ['test_table']
231231
index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'
232232

233+
partition_bys: # optional
234+
- databases: '*'
235+
tables: ['test_table']
236+
partition_by: 'toYYYYMM(created_at)'
237+
233238
http_host: '0.0.0.0' # optional
234239
http_port: 9128 # optional
235240

@@ -258,6 +263,7 @@ ignore_deletes: false # optional, set to true to ignore DELETE operations
258263
- `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage.
259264
- `binlog_retention_period` - how long to keep binlog files in seconds. Default 43200 (12 hours). This setting controls how long the local binlog files are retained before being automatically cleaned up.
260265
- `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch.
266+
- `partition_bys` - custom PARTITION BY expressions for tables. By default uses `intDiv(id, 4294967)` for integer primary keys. Useful for time-based partitioning like `toYYYYMM(created_at)`.
261267
- `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands
262268
- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc.
263269
- `ignore_deletes` - when set to `true`, DELETE operations in MySQL will be ignored during replication. This creates an append-only model where data is only added, never removed. In this mode, the replicator doesn't create a temporary database and instead replicates directly to the target database.

mysql_ch_replicator/clickhouse_api.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def get_last_used_version(self, table_name):
138138
def set_last_used_version(self, table_name, last_used_version):
139139
self.tables_last_record_version[table_name] = last_used_version
140140

141-
def create_table(self, structure: TableStructure, additional_indexes: list | None = None):
141+
def create_table(self, structure: TableStructure, additional_indexes: list | None = None, additional_partition_bys: list | None = None):
142142
if not structure.primary_keys:
143143
raise Exception(f'missing primary key for {structure.table_name}')
144144

@@ -148,9 +148,15 @@ def create_table(self, structure: TableStructure, additional_indexes: list | Non
148148
fields = ',\n'.join(fields)
149149
partition_by = ''
150150

151-
if len(structure.primary_keys) == 1:
152-
if 'int' in structure.fields[structure.primary_key_ids[0]].field_type.lower():
153-
partition_by = f'PARTITION BY intDiv({structure.primary_keys[0]}, 4294967)\n'
151+
# Check for custom partition_by first
152+
if additional_partition_bys:
153+
# Use the first custom partition_by if available
154+
partition_by = f'PARTITION BY {additional_partition_bys[0]}\n'
155+
else:
156+
# Fallback to default logic
157+
if len(structure.primary_keys) == 1:
158+
if 'int' in structure.fields[structure.primary_key_ids[0]].field_type.lower():
159+
partition_by = f'PARTITION BY intDiv({structure.primary_keys[0]}, 4294967)\n'
154160

155161
indexes = [
156162
'INDEX _version _version TYPE minmax GRANULARITY 1',

mysql_ch_replicator/config.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ class Index:
3636
index: str = ''
3737

3838

39+
@dataclass
40+
class PartitionBy:
41+
databases: str | list = '*'
42+
tables: str | list = '*'
43+
partition_by: str = ''
44+
45+
3946
@dataclass
4047
class ClickhouseSettings:
4148
host: str = 'localhost'
@@ -114,6 +121,7 @@ def __init__(self):
114121
self.optimize_interval = 0
115122
self.check_db_updated_interval = 0
116123
self.indexes: list[Index] = []
124+
self.partition_bys: list[PartitionBy] = []
117125
self.auto_restart_interval = 0
118126
self.http_host = ''
119127
self.http_port = 0
@@ -153,6 +161,13 @@ def load(self, settings_file):
153161
self.indexes.append(
154162
Index(**index)
155163
)
164+
165+
partition_bys = data.pop('partition_bys', [])
166+
for partition_by in partition_bys:
167+
self.partition_bys.append(
168+
PartitionBy(**partition_by)
169+
)
170+
156171
assert isinstance(self.databases, str) or isinstance(self.databases, list)
157172
assert isinstance(self.tables, str) or isinstance(self.tables, list)
158173
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))
@@ -199,6 +214,16 @@ def get_indexes(self, db_name, table_name):
199214
results.append(index.index)
200215
return results
201216

217+
def get_partition_bys(self, db_name, table_name):
218+
results = []
219+
for partition_by in self.partition_bys:
220+
if not self.is_pattern_matches(db_name, partition_by.databases):
221+
continue
222+
if not self.is_pattern_matches(table_name, partition_by.tables):
223+
continue
224+
results.append(partition_by.partition_by)
225+
return results
226+
202227
def validate(self):
203228
self.mysql.validate()
204229
self.clickhouse.validate()

mysql_ch_replicator/db_replicator_initial.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ def create_initial_structure_table(self, table_name):
5454

5555
self.replicator.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
5656
indexes = self.replicator.config.get_indexes(self.replicator.database, table_name)
57+
partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, table_name)
5758

5859
if not self.replicator.is_parallel_worker:
59-
self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)
60+
self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes, additional_partition_bys=partition_bys)
6061

6162
def validate_mysql_structure(self, mysql_structure: TableStructure):
6263
for key_idx in mysql_structure.primary_key_ids:

mysql_ch_replicator/db_replicator_realtime.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ def handle_create_table_query(self, query, db_name):
201201
return
202202
self.replicator.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
203203
indexes = self.replicator.config.get_indexes(self.replicator.database, ch_structure.table_name)
204-
self.replicator.clickhouse_api.create_table(ch_structure, additional_indexes=indexes)
204+
partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, ch_structure.table_name)
205+
self.replicator.clickhouse_api.create_table(ch_structure, additional_indexes=indexes, additional_partition_bys=partition_bys)
205206

206207
def handle_drop_table_query(self, query, db_name):
207208
tokens = query.split()

test_mysql_ch_replicator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ def test_e2e_regular(config_file):
134134
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
135135
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
136136

137+
# Check for custom partition_by configuration when using CONFIG_FILE (tests_config.yaml)
138+
if config_file == CONFIG_FILE_MARIADB:
139+
create_query = ch.show_create_table(TEST_TABLE_NAME)
140+
assert 'PARTITION BY intDiv(id, 1000000)' in create_query, f"Custom partition_by not found in CREATE TABLE query: {create_query}"
141+
137142
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True)
138143
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
139144
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50)

tests_config_mariadb.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,10 @@ databases: '*test*'
1919
log_level: 'debug'
2020
optimize_interval: 3
2121
check_db_updated_interval: 3
22+
23+
24+
partition_bys:
25+
- databases: 'replication-test_db'
26+
tables: ['test_table']
27+
partition_by: 'intDiv(id, 1000000)'
28+

0 commit comments

Comments
 (0)