Skip to content

Commit 2e11aef

Browse files
committed
Tables and databases filtering
1 parent e93d777 commit 2e11aef

File tree

6 files changed

+143
-23
lines changed

6 files changed

+143
-23
lines changed

README.md

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,22 @@ binlog_replicator:
110110
records_per_file: 100000
111111

112112
databases: 'database_name_pattern_*'
113+
tables: '*'
113114
```
114115
115116
116117
- `mysql` MySQL connection settings
117118
- `clickhouse` ClickHouse connection settings
118119
- `binlog_replicator.data_dir` Directory for store binary log and application state
119-
- `databases` Databases name pattern to replicate, eg `db_*` will match `db_1` `db_2` `db_test`
120+
- `databases` Databases name pattern to replicate, e.g. `db_*` will match `db_1` `db_2` `db_test`, list is also supported
121+
- `tables` (__optional__) - tables to filter, list is also supported
122+
123+
Few more tables / dbs examples:
124+
125+
```yaml
126+
databases: ['my_database_1', 'my_database_2']
127+
tables: ['table_1', 'table_2*']
128+
```
120129

121130
### Advanced Features
122131

@@ -144,13 +153,13 @@ pip install -r requirements.txt
144153

145154
### Running Tests
146155

147-
For running test you will need:
148-
1. MySQL and ClickHouse server
149-
2. `tests_config.yaml` that will be used during tests
150-
3. Run tests with:
151-
156+
1. Use docker-compose to install all requirements:
157+
```bash
158+
sudo docker compose -f docker-compose-tests.yaml up
159+
```
160+
2. Run tests with:
152161
```bash
153-
pytest -v -s test_mysql_ch_replicator.py
162+
sudo docker exec -w /app/ -it mysql_ch_replicator-replicator-1 python3 -m pytest -v -s test_mysql_ch_replicator.py
154163
```
155164

156165
## Contribution

mysql_ch_replicator/binlog_replicator.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,12 @@ def run(self):
401401
log_event = LogEvent()
402402
if hasattr(event, 'table'):
403403
log_event.table_name = event.table
404+
if isinstance(log_event.table_name, bytes):
405+
log_event.table_name = log_event.table_name.decode('utf-8')
406+
407+
if not self.settings.is_table_matches(log_event.table_name):
408+
continue
409+
404410
log_event.db_name = event.schema
405411

406412
if isinstance(log_event.db_name, bytes):

mysql_ch_replicator/config.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(self):
3333
self.clickhouse = ClickhouseSettings()
3434
self.binlog_replicator = BinlogReplicatorSettings()
3535
self.databases = ''
36+
self.tables = '*'
3637
self.settings_file = ''
3738

3839
def load(self, settings_file):
@@ -43,8 +44,26 @@ def load(self, settings_file):
4344
self.mysql = MysqlSettings(**data['mysql'])
4445
self.clickhouse = ClickhouseSettings(**data['clickhouse'])
4546
self.databases = data['databases']
46-
assert isinstance(self.databases, str)
47+
self.tables = data.get('tables', '*')
48+
assert isinstance(self.databases, str) or isinstance(self.databases, list)
49+
assert isinstance(self.tables, str) or isinstance(self.tables, list)
4750
self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator'])
4851

52+
@classmethod
53+
def is_pattern_matches(cls, substr, pattern):
54+
if not pattern or pattern == '*':
55+
return True
56+
if isinstance(pattern, str):
57+
return fnmatch.fnmatch(substr, pattern)
58+
if isinstance(pattern, list):
59+
for allowed_pattern in pattern:
60+
if fnmatch.fnmatch(substr, allowed_pattern):
61+
return True
62+
return False
63+
raise ValueError()
64+
4965
def is_database_matches(self, db_name):
50-
return fnmatch.fnmatch(db_name, self.databases)
66+
return self.is_pattern_matches(db_name, self.databases)
67+
68+
def is_table_matches(self, table_name):
69+
return self.is_pattern_matches(table_name, self.tables)

mysql_ch_replicator/db_replicator.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ def run(self):
136136
self.clickhouse_api.database = self.target_database_tmp
137137
self.clickhouse_api.recreate_database()
138138
self.state.tables = self.mysql_api.get_tables()
139+
self.state.tables = [
140+
table for table in self.state.tables if self.config.is_table_matches(table)
141+
]
139142
self.state.last_processed_transaction = self.data_reader.get_last_transaction_id()
140143
self.state.save()
141144
logger.info(f'last known transaction {self.state.last_processed_transaction}')
@@ -150,6 +153,8 @@ def create_initial_structure(self):
150153
self.state.save()
151154

152155
def create_initial_structure_table(self, table_name):
156+
if not self.config.is_table_matches(table_name):
157+
return
153158
mysql_create_statement = self.mysql_api.get_table_create_statement(table_name)
154159
mysql_structure = self.converter.parse_mysql_table_structure(
155160
mysql_create_statement, required_table_name=table_name,
@@ -198,6 +203,9 @@ def perform_initial_replication(self):
198203
def perform_initial_replication_table(self, table_name):
199204
logger.info(f'running initial replication for table {table_name}')
200205

206+
if not self.config.is_table_matches(table_name):
207+
logger.info(f'skip table {table_name} - not matching any allowed table')
208+
201209
max_primary_key = None
202210
if self.state.initial_replication_table == table_name:
203211
# continue replication from saved position
@@ -294,7 +302,8 @@ def handle_event(self, event: LogEvent):
294302
EventType.QUERY.value: self.handle_query_event,
295303
}
296304

297-
event_handlers[event.event_type](event)
305+
if not event.table_name or self.config.is_table_matches(event.table_name):
306+
event_handlers[event.event_type](event)
298307

299308
self.stats.events_count += 1
300309
self.stats.last_transaction = event.transaction_id
@@ -367,6 +376,8 @@ def handle_alter_query(self, query, db_name):
367376

368377
def handle_create_table_query(self, query, db_name):
369378
mysql_structure, ch_structure = self.converter.parse_create_table_query(query)
379+
if not self.config.is_table_matches(mysql_structure.table_name):
380+
return
370381
self.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
371382
self.clickhouse_api.create_table(ch_structure)
372383

test_mysql_ch_replicator.py

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,21 @@
2020

2121

2222
class BinlogReplicatorRunner(ProcessRunner):
23-
def __init__(self):
24-
super().__init__(f'./main.py --config {CONFIG_FILE} binlog_replicator')
23+
def __init__(self, cfg_file=CONFIG_FILE):
24+
super().__init__(f'./main.py --config {cfg_file} binlog_replicator')
2525

2626

2727
class DbReplicatorRunner(ProcessRunner):
28-
def __init__(self, db_name, additional_arguments=None):
28+
def __init__(self, db_name, additional_arguments=None, cfg_file=CONFIG_FILE):
2929
additional_arguments = additional_arguments or ''
3030
if not additional_arguments.startswith(' '):
3131
additional_arguments = ' ' + additional_arguments
32-
super().__init__(f'./main.py --config {CONFIG_FILE} --db {db_name} db_replicator{additional_arguments}')
32+
super().__init__(f'./main.py --config {cfg_file} --db {db_name} db_replicator{additional_arguments}')
3333

3434

3535
class RunAllRunner(ProcessRunner):
36-
def __init__(self, db_name):
37-
super().__init__(f'./main.py --config {CONFIG_FILE} run_all --db {db_name}')
36+
def __init__(self, cfg_file=CONFIG_FILE):
37+
super().__init__(f'./main.py --config {cfg_file} run_all')
3838

3939

4040
def kill_process(pid, force=False):
@@ -57,15 +57,16 @@ def prepare_env(
5757
cfg: config.Settings,
5858
mysql: mysql_api.MySQLApi,
5959
ch: clickhouse_api.ClickhouseApi,
60+
db_name: str = TEST_DB_NAME
6061
):
6162
if os.path.exists(cfg.binlog_replicator.data_dir):
6263
shutil.rmtree(cfg.binlog_replicator.data_dir)
6364
os.mkdir(cfg.binlog_replicator.data_dir)
64-
mysql.drop_database(TEST_DB_NAME)
65-
mysql.create_database(TEST_DB_NAME)
66-
mysql.set_database(TEST_DB_NAME)
67-
ch.drop_database(TEST_DB_NAME)
68-
assert_wait(lambda: TEST_DB_NAME not in ch.get_databases())
65+
mysql.drop_database(db_name)
66+
mysql.create_database(db_name)
67+
mysql.set_database(db_name)
68+
ch.drop_database(db_name)
69+
assert_wait(lambda: db_name not in ch.get_databases())
6970

7071

7172
def test_e2e_regular():
@@ -299,7 +300,7 @@ def test_runner():
299300
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True)
300301
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True)
301302

302-
run_all_runner = RunAllRunner(TEST_DB_NAME)
303+
run_all_runner = RunAllRunner()
303304
run_all_runner.run()
304305

305306
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
@@ -371,4 +372,59 @@ def test_initial_only():
371372
ch.execute_command(f'USE {TEST_DB_NAME}')
372373

373374
assert TEST_TABLE_NAME in ch.get_tables()
374-
assert len(ch.select(TEST_TABLE_NAME)) == 2
375+
assert len(ch.select(TEST_TABLE_NAME)) == 2
376+
377+
378+
def test_database_tables_filtering():
379+
cfg = config.Settings()
380+
cfg.load('tests_config_databases_tables.yaml')
381+
382+
mysql = mysql_api.MySQLApi(
383+
database=None,
384+
mysql_settings=cfg.mysql,
385+
)
386+
387+
ch = clickhouse_api.ClickhouseApi(
388+
database='test_db_2',
389+
clickhouse_settings=cfg.clickhouse,
390+
)
391+
392+
mysql.drop_database('test_db_3')
393+
mysql.create_database('test_db_3')
394+
ch.drop_database('test_db_3')
395+
396+
prepare_env(cfg, mysql, ch, db_name='test_db_2')
397+
398+
mysql.execute(f'''
399+
CREATE TABLE test_table_3 (
400+
id int NOT NULL AUTO_INCREMENT,
401+
name varchar(255),
402+
age int,
403+
PRIMARY KEY (id)
404+
);
405+
''')
406+
407+
mysql.execute(f'''
408+
CREATE TABLE test_table_2 (
409+
id int NOT NULL AUTO_INCREMENT,
410+
name varchar(255),
411+
age int,
412+
PRIMARY KEY (id)
413+
);
414+
''')
415+
416+
mysql.execute(f"INSERT INTO test_table_3 (name, age) VALUES ('Ivan', 42);", commit=True)
417+
mysql.execute(f"INSERT INTO test_table_2 (name, age) VALUES ('Ivan', 42);", commit=True)
418+
419+
run_all_runner = RunAllRunner(cfg_file='tests_config_databases_tables.yaml')
420+
run_all_runner.run()
421+
422+
assert_wait(lambda: 'test_db_2' in ch.get_databases())
423+
assert 'test_db_3' not in ch.get_databases()
424+
425+
ch.execute_command('USE test_db_2')
426+
427+
assert_wait(lambda: 'test_table_2' in ch.get_tables())
428+
assert_wait(lambda: len(ch.select('test_table_2')) == 1)
429+
430+
assert 'test_table_3' not in ch.get_tables()

tests_config_databases_tables.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
mysql:
3+
host: 'localhost'
4+
port: 9306
5+
user: 'root'
6+
password: 'admin'
7+
8+
clickhouse:
9+
host: 'localhost'
10+
port: 9123
11+
user: 'default'
12+
password: 'admin'
13+
14+
binlog_replicator:
15+
data_dir: '/app/binlog/'
16+
records_per_file: 100000
17+
18+
databases: ['test_db_1*', 'test_db_2']
19+
tables: ['test_table_1*', 'test_table_2']

0 commit comments

Comments
 (0)