Skip to content

Commit b40ad37

Browse files
authored
Post replication commands (#203)
1 parent 958080f commit b40ad37

File tree

6 files changed

+248
-1
lines changed

6 files changed

+248
-1
lines changed

.cursor/rules/rules.mdc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ alwaysApply: true
55
---
66
Use following command to run tests:
77

8-
sudo docker exec -w /app/ -it mysql_ch_replicator-replicator-1 python3 -m pytest -v -s test_mysql_ch_replicator.py -k test_truncate_operation_bug_issue_155
8+
docker exec -w /app/ -it tests-replicator-1 python3 -m pytest -v -s tests/ -k test_your_test_name

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,16 @@ mysql_timezone: 'UTC' # optional, timezone for MySQL timestamp conversion (de
275275
- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc.
276276
- `ignore_deletes` - when set to `true`, DELETE operations in MySQL will be ignored during replication. This creates an append-only model where data is only added, never removed. In this mode, the replicator doesn't create a temporary database and instead replicates directly to the target database.
277277
- `mysql_timezone` - timezone to use for MySQL timestamp conversion to ClickHouse DateTime64. Default is `'UTC'`. Accepts any valid timezone name (e.g., `'America/New_York'`, `'Europe/London'`, `'Asia/Tokyo'`). This setting ensures proper timezone handling when converting MySQL timestamp fields to ClickHouse DateTime64 with timezone information.
278+
- `post_initial_replication_commands` - SQL commands to execute in ClickHouse after initial replication completes for each database. Useful for creating materialized views, summary tables, or other database objects. Commands are executed in order, once per database matching the pattern.
279+
280+
```yaml
281+
post_initial_replication_commands:
282+
- databases: '*'
283+
commands:
284+
- 'CREATE TABLE IF NOT EXISTS summary_table (...) ENGINE = SummingMergeTree() ORDER BY (...)'
285+
- 'CREATE MATERIALIZED VIEW IF NOT EXISTS data_mv TO summary_table AS SELECT ...'
286+
- 'INSERT INTO summary_table SELECT ... FROM replicated_table'
287+
```
278288

279289
Few more tables / dbs examples:
280290

mysql_ch_replicator/config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ class PartitionBy:
4848
partition_by: str = ''
4949

5050

51+
@dataclass
52+
class PostInitialReplicationCommands:
53+
databases: str | list = '*'
54+
commands: list = None
55+
56+
5157
@dataclass
5258
class ClickhouseSettings:
5359
host: str = 'localhost'
@@ -134,6 +140,7 @@ def __init__(self):
134140
self.check_db_updated_interval = 0
135141
self.indexes: list[Index] = []
136142
self.partition_bys: list[PartitionBy] = []
143+
self.post_initial_replication_commands: list[PostInitialReplicationCommands] = []
137144
self.auto_restart_interval = 0
138145
self.http_host = ''
139146
self.http_port = 0
@@ -184,6 +191,12 @@ def load(self, settings_file):
184191
PartitionBy(**partition_by)
185192
)
186193

194+
post_initial_replication_commands = data.pop('post_initial_replication_commands', [])
195+
for cmd_config in post_initial_replication_commands:
196+
self.post_initial_replication_commands.append(
197+
PostInitialReplicationCommands(**cmd_config)
198+
)
199+
187200
assert isinstance(self.databases, str) or isinstance(self.databases, list)
188201
assert isinstance(self.tables, str) or isinstance(self.tables, list)
189202
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))
@@ -250,6 +263,15 @@ def get_partition_bys(self, db_name, table_name):
250263
results.append(partition_by.partition_by)
251264
return results
252265

266+
def get_post_initial_replication_commands(self, db_name):
267+
results = []
268+
for cmd_config in self.post_initial_replication_commands:
269+
if not self.is_pattern_matches(db_name, cmd_config.databases):
270+
continue
271+
if cmd_config.commands:
272+
results.extend(cmd_config.commands)
273+
return results
274+
253275
def validate(self):
254276
self.mysql.validate()
255277
self.clickhouse.validate()

mysql_ch_replicator/db_replicator_initial.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ def perform_initial_replication(self):
123123
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
124124
)
125125
self.replicator.clickhouse_api.database = self.replicator.target_database
126+
127+
# Execute post-initial-replication commands
128+
self.execute_post_initial_replication_commands()
126129
logger.info(f'initial replication - done')
127130

128131
def perform_initial_replication_table(self, table_name):
@@ -425,3 +428,25 @@ def consolidate_worker_record_versions(self, table_name):
425428
logger.info(f"Current version {current_version} is already up-to-date with ClickHouse version {max_version}")
426429
else:
427430
logger.warning(f"No record version found in ClickHouse for table {table_name}")
431+
432+
def execute_post_initial_replication_commands(self):
433+
"""
434+
Execute custom commands after initial replication finishes.
435+
Commands are configured per database in the config file.
436+
"""
437+
commands = self.replicator.config.get_post_initial_replication_commands(self.replicator.database)
438+
439+
if not commands:
440+
logger.info(f'No post-initial-replication commands configured for database {self.replicator.database}')
441+
return
442+
443+
logger.info(f'Executing {len(commands)} post-initial-replication commands for database {self.replicator.database}')
444+
445+
self.replicator.clickhouse_api.execute_command(f'USE `{self.replicator.target_database}`')
446+
447+
for i, command in enumerate(commands, 1):
448+
logger.info(f'Executing command {i}/{len(commands)}: {command[:100]}...')
449+
self.replicator.clickhouse_api.execute_command(command)
450+
logger.info(f'Command {i}/{len(commands)} executed successfully')
451+
452+
logger.info(f'All post-initial-replication commands executed successfully')

tests/test_mysql_ch_replicator.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,3 +956,149 @@ def test_resume_initial_replication_with_ignore_deletes():
956956
finally:
957957
# Clean up temp config file
958958
os.unlink(config_file)
959+
960+
961+
def test_post_initial_replication_commands():
962+
config_file = 'tests/tests_config_post_commands.yaml'
963+
964+
cfg = config.Settings()
965+
cfg.load(config_file)
966+
967+
mysql = mysql_api.MySQLApi(
968+
database=None,
969+
mysql_settings=cfg.mysql,
970+
)
971+
972+
ch = clickhouse_api.ClickhouseApi(
973+
database=TEST_DB_NAME,
974+
clickhouse_settings=cfg.clickhouse,
975+
)
976+
977+
prepare_env(cfg, mysql, ch)
978+
979+
mysql.execute(f"""
980+
CREATE TABLE `{TEST_TABLE_NAME}` (
981+
id int(11) NOT NULL AUTO_INCREMENT,
982+
event_time DATETIME NOT NULL,
983+
event_type VARCHAR(50) NOT NULL,
984+
PRIMARY KEY (id)
985+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
986+
""")
987+
988+
initial_inserts = [
989+
('2024-01-01', 'type_0'),
990+
('2024-01-02', 'type_1'),
991+
('2024-01-03', 'type_2'),
992+
('2024-01-01', 'type_0'),
993+
('2024-01-02', 'type_1'),
994+
('2024-01-03', 'type_2'),
995+
('2024-01-01', 'type_0'),
996+
('2024-01-02', 'type_1'),
997+
('2024-01-03', 'type_2'),
998+
('2024-01-04', 'type_0'),
999+
]
1000+
1001+
for date, event_type in initial_inserts:
1002+
mysql.execute(
1003+
f"INSERT INTO `{TEST_TABLE_NAME}` (event_time, event_type) VALUES ('{date} 10:00:00', '{event_type}');",
1004+
commit=True
1005+
)
1006+
1007+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
1008+
binlog_replicator_runner.run()
1009+
1010+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
1011+
db_replicator_runner.run()
1012+
1013+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
1014+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
1015+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
1016+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 10)
1017+
1018+
assert_wait(lambda: 'events_per_day' in ch.get_tables(), max_wait_time=15)
1019+
assert_wait(lambda: 'events_mv' in ch.get_tables(), max_wait_time=15)
1020+
1021+
ch.execute_command('OPTIMIZE TABLE events_per_day FINAL')
1022+
1023+
def check_initial_data_ready():
1024+
records = ch.select('events_per_day ORDER BY event_date, event_type')
1025+
return len(records) > 0
1026+
1027+
assert_wait(check_initial_data_ready, max_wait_time=10)
1028+
1029+
events_per_day_records = ch.select('events_per_day ORDER BY event_date, event_type')
1030+
assert len(events_per_day_records) > 0, "events_per_day should have aggregated data from initial replication backfill"
1031+
1032+
aggregated_data = {}
1033+
for record in events_per_day_records:
1034+
date_str = str(record['event_date'])
1035+
event_type = record['event_type']
1036+
count = record['total_events']
1037+
key = (date_str, event_type)
1038+
aggregated_data[key] = count
1039+
1040+
assert ('2024-01-01', 'type_0') in aggregated_data, "Should have 2024-01-01 + type_0"
1041+
assert aggregated_data[('2024-01-01', 'type_0')] == 3, f"Expected 3 events for 2024-01-01 + type_0, got {aggregated_data[('2024-01-01', 'type_0')]}"
1042+
1043+
assert ('2024-01-02', 'type_1') in aggregated_data, "Should have 2024-01-02 + type_1"
1044+
assert aggregated_data[('2024-01-02', 'type_1')] == 3, f"Expected 3 events for 2024-01-02 + type_1, got {aggregated_data[('2024-01-02', 'type_1')]}"
1045+
1046+
assert ('2024-01-03', 'type_2') in aggregated_data, "Should have 2024-01-03 + type_2"
1047+
assert aggregated_data[('2024-01-03', 'type_2')] == 3, f"Expected 3 events for 2024-01-03 + type_2, got {aggregated_data[('2024-01-03', 'type_2')]}"
1048+
1049+
assert ('2024-01-04', 'type_0') in aggregated_data, "Should have 2024-01-04 + type_0"
1050+
assert aggregated_data[('2024-01-04', 'type_0')] == 1, f"Expected 1 event for 2024-01-04 + type_0, got {aggregated_data[('2024-01-04', 'type_0')]}"
1051+
1052+
realtime_inserts = [
1053+
('2024-01-05', 'type_new', 3),
1054+
('2024-01-06', 'type_new', 2),
1055+
('2024-01-01', 'type_0', 2),
1056+
]
1057+
1058+
for date, event_type, count in realtime_inserts:
1059+
for _ in range(count):
1060+
mysql.execute(
1061+
f"INSERT INTO `{TEST_TABLE_NAME}` (event_time, event_type) VALUES ('{date} 12:00:00', '{event_type}');",
1062+
commit=True
1063+
)
1064+
1065+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 17)
1066+
1067+
ch.execute_command('OPTIMIZE TABLE events_per_day FINAL')
1068+
1069+
def check_realtime_aggregated():
1070+
records = ch.select('events_per_day')
1071+
agg = {}
1072+
for r in records:
1073+
key = (str(r['event_date']), r['event_type'])
1074+
agg[key] = r['total_events']
1075+
return (
1076+
agg.get(('2024-01-05', 'type_new'), 0) == 3 and
1077+
agg.get(('2024-01-06', 'type_new'), 0) == 2 and
1078+
agg.get(('2024-01-01', 'type_0'), 0) == 5
1079+
)
1080+
1081+
assert_wait(check_realtime_aggregated, max_wait_time=15)
1082+
1083+
ch.execute_command('OPTIMIZE TABLE events_per_day FINAL')
1084+
1085+
def check_final_aggregated():
1086+
final_records = ch.select('events_per_day ORDER BY event_date, event_type')
1087+
final_aggregated_data = {}
1088+
for record in final_records:
1089+
date_str = str(record['event_date'])
1090+
event_type = record['event_type']
1091+
count = record['total_events']
1092+
key = (date_str, event_type)
1093+
final_aggregated_data[key] = count
1094+
1095+
return (
1096+
final_aggregated_data.get(('2024-01-05', 'type_new')) == 3 and
1097+
final_aggregated_data.get(('2024-01-06', 'type_new')) == 2 and
1098+
final_aggregated_data.get(('2024-01-01', 'type_0')) == 5
1099+
)
1100+
1101+
assert_wait(check_final_aggregated, max_wait_time=15)
1102+
1103+
db_replicator_runner.stop()
1104+
binlog_replicator_runner.stop()
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
mysql:
2+
host: 'localhost'
3+
port: 9306
4+
user: 'root'
5+
password: 'admin'
6+
7+
clickhouse:
8+
host: 'localhost'
9+
port: 9123
10+
user: 'default'
11+
password: 'admin'
12+
erase_batch_size: 2
13+
14+
binlog_replicator:
15+
data_dir: '/app/binlog/'
16+
records_per_file: 100000
17+
binlog_retention_period: 43200
18+
19+
databases: '*test*'
20+
log_level: 'info'
21+
optimize_interval: 3
22+
check_db_updated_interval: 3
23+
24+
target_databases:
25+
replication-test_db_2: replication-destination
26+
27+
indexes:
28+
- databases: '*'
29+
tables: ['group']
30+
index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'
31+
32+
http_host: 'localhost'
33+
http_port: 9128
34+
35+
types_mapping:
36+
'char(36)': 'UUID'
37+
38+
post_initial_replication_commands:
39+
- databases: '*'
40+
commands:
41+
- 'CREATE TABLE IF NOT EXISTS events_per_day (event_date Date, event_type String, total_events UInt64) ENGINE = SummingMergeTree() ORDER BY (event_date, event_type)'
42+
- 'CREATE MATERIALIZED VIEW IF NOT EXISTS events_mv TO events_per_day AS SELECT toDate(event_time) AS event_date, event_type, count() AS total_events FROM test_table GROUP BY event_date, event_type'
43+
- 'INSERT INTO events_per_day SELECT toDate(event_time) AS event_date, event_type, count() AS total_events FROM test_table GROUP BY event_date, event_type'
44+

0 commit comments

Comments
 (0)