Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,22 @@ binlog_replicator:
records_per_file: 100000

databases: 'database_name_pattern_*'
tables: '*'
```


- `mysql` MySQL connection settings
- `clickhouse` ClickHouse connection settings
- `binlog_replicator.data_dir` Directory for store binary log and application state
- `databases` Databases name pattern to replicate, eg `db_*` will match `db_1` `db_2` `db_test`
- `databases` Databases name pattern to replicate, e.g. `db_*` will match `db_1` `db_2` `db_test`, list is also supported
- `tables` (__optional__) - tables to filter, list is also supported

Few more tables / dbs examples:

```yaml
databases: ['my_database_1', 'my_database_2']
tables: ['table_1', 'table_2*']
```

### Advanced Features

Expand Down Expand Up @@ -144,13 +153,13 @@ pip install -r requirements.txt

### Running Tests

For running test you will need:
1. MySQL and ClickHouse server
2. `tests_config.yaml` that will be used during tests
3. Run tests with:

1. Use docker-compose to install all requirements:
```bash
sudo docker compose -f docker-compose-tests.yaml up
```
2. Run tests with:
```bash
pytest -v -s test_mysql_ch_replicator.py
sudo docker exec -w /app/ -it mysql_ch_replicator-replicator-1 python3 -m pytest -v -s test_mysql_ch_replicator.py
```

## Contribution
Expand Down
6 changes: 6 additions & 0 deletions mysql_ch_replicator/binlog_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ def run(self):
log_event = LogEvent()
if hasattr(event, 'table'):
log_event.table_name = event.table
if isinstance(log_event.table_name, bytes):
log_event.table_name = log_event.table_name.decode('utf-8')

if not self.settings.is_table_matches(log_event.table_name):
continue

log_event.db_name = event.schema

if isinstance(log_event.db_name, bytes):
Expand Down
23 changes: 21 additions & 2 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self):
self.clickhouse = ClickhouseSettings()
self.binlog_replicator = BinlogReplicatorSettings()
self.databases = ''
self.tables = '*'
self.settings_file = ''

def load(self, settings_file):
Expand All @@ -43,8 +44,26 @@ def load(self, settings_file):
self.mysql = MysqlSettings(**data['mysql'])
self.clickhouse = ClickhouseSettings(**data['clickhouse'])
self.databases = data['databases']
assert isinstance(self.databases, str)
self.tables = data.get('tables', '*')
assert isinstance(self.databases, str) or isinstance(self.databases, list)
assert isinstance(self.tables, str) or isinstance(self.tables, list)
self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator'])

@classmethod
def is_pattern_matches(cls, substr, pattern):
if not pattern or pattern == '*':
return True
if isinstance(pattern, str):
return fnmatch.fnmatch(substr, pattern)
if isinstance(pattern, list):
for allowed_pattern in pattern:
if fnmatch.fnmatch(substr, allowed_pattern):
return True
return False
raise ValueError()

def is_database_matches(self, db_name):
return fnmatch.fnmatch(db_name, self.databases)
return self.is_pattern_matches(db_name, self.databases)

def is_table_matches(self, table_name):
return self.is_pattern_matches(table_name, self.tables)
13 changes: 12 additions & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def run(self):
self.clickhouse_api.database = self.target_database_tmp
self.clickhouse_api.recreate_database()
self.state.tables = self.mysql_api.get_tables()
self.state.tables = [
table for table in self.state.tables if self.config.is_table_matches(table)
]
self.state.last_processed_transaction = self.data_reader.get_last_transaction_id()
self.state.save()
logger.info(f'last known transaction {self.state.last_processed_transaction}')
Expand All @@ -150,6 +153,8 @@ def create_initial_structure(self):
self.state.save()

def create_initial_structure_table(self, table_name):
if not self.config.is_table_matches(table_name):
return
mysql_create_statement = self.mysql_api.get_table_create_statement(table_name)
mysql_structure = self.converter.parse_mysql_table_structure(
mysql_create_statement, required_table_name=table_name,
Expand Down Expand Up @@ -198,6 +203,9 @@ def perform_initial_replication(self):
def perform_initial_replication_table(self, table_name):
logger.info(f'running initial replication for table {table_name}')

if not self.config.is_table_matches(table_name):
logger.info(f'skip table {table_name} - not matching any allowed table')

max_primary_key = None
if self.state.initial_replication_table == table_name:
# continue replication from saved position
Expand Down Expand Up @@ -294,7 +302,8 @@ def handle_event(self, event: LogEvent):
EventType.QUERY.value: self.handle_query_event,
}

event_handlers[event.event_type](event)
if not event.table_name or self.config.is_table_matches(event.table_name):
event_handlers[event.event_type](event)

self.stats.events_count += 1
self.stats.last_transaction = event.transaction_id
Expand Down Expand Up @@ -367,6 +376,8 @@ def handle_alter_query(self, query, db_name):

def handle_create_table_query(self, query, db_name):
mysql_structure, ch_structure = self.converter.parse_create_table_query(query)
if not self.config.is_table_matches(mysql_structure.table_name):
return
self.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
self.clickhouse_api.create_table(ch_structure)

Expand Down
82 changes: 69 additions & 13 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@


class BinlogReplicatorRunner(ProcessRunner):
def __init__(self):
super().__init__(f'./main.py --config {CONFIG_FILE} binlog_replicator')
def __init__(self, cfg_file=CONFIG_FILE):
super().__init__(f'./main.py --config {cfg_file} binlog_replicator')


class DbReplicatorRunner(ProcessRunner):
def __init__(self, db_name, additional_arguments=None):
def __init__(self, db_name, additional_arguments=None, cfg_file=CONFIG_FILE):
additional_arguments = additional_arguments or ''
if not additional_arguments.startswith(' '):
additional_arguments = ' ' + additional_arguments
super().__init__(f'./main.py --config {CONFIG_FILE} --db {db_name} db_replicator{additional_arguments}')
super().__init__(f'./main.py --config {cfg_file} --db {db_name} db_replicator{additional_arguments}')


class RunAllRunner(ProcessRunner):
def __init__(self, db_name):
super().__init__(f'./main.py --config {CONFIG_FILE} run_all --db {db_name}')
def __init__(self, cfg_file=CONFIG_FILE):
super().__init__(f'./main.py --config {cfg_file} run_all')


def kill_process(pid, force=False):
Expand All @@ -57,15 +57,16 @@ def prepare_env(
cfg: config.Settings,
mysql: mysql_api.MySQLApi,
ch: clickhouse_api.ClickhouseApi,
db_name: str = TEST_DB_NAME
):
if os.path.exists(cfg.binlog_replicator.data_dir):
shutil.rmtree(cfg.binlog_replicator.data_dir)
os.mkdir(cfg.binlog_replicator.data_dir)
mysql.drop_database(TEST_DB_NAME)
mysql.create_database(TEST_DB_NAME)
mysql.set_database(TEST_DB_NAME)
ch.drop_database(TEST_DB_NAME)
assert_wait(lambda: TEST_DB_NAME not in ch.get_databases())
mysql.drop_database(db_name)
mysql.create_database(db_name)
mysql.set_database(db_name)
ch.drop_database(db_name)
assert_wait(lambda: db_name not in ch.get_databases())


def test_e2e_regular():
Expand Down Expand Up @@ -299,7 +300,7 @@ def test_runner():
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True)

run_all_runner = RunAllRunner(TEST_DB_NAME)
run_all_runner = RunAllRunner()
run_all_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
Expand Down Expand Up @@ -371,4 +372,59 @@ def test_initial_only():
ch.execute_command(f'USE {TEST_DB_NAME}')

assert TEST_TABLE_NAME in ch.get_tables()
assert len(ch.select(TEST_TABLE_NAME)) == 2
assert len(ch.select(TEST_TABLE_NAME)) == 2


def test_database_tables_filtering():
cfg = config.Settings()
cfg.load('tests_config_databases_tables.yaml')

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database='test_db_2',
clickhouse_settings=cfg.clickhouse,
)

mysql.drop_database('test_db_3')
mysql.create_database('test_db_3')
ch.drop_database('test_db_3')

prepare_env(cfg, mysql, ch, db_name='test_db_2')

mysql.execute(f'''
CREATE TABLE test_table_3 (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
PRIMARY KEY (id)
);
''')

mysql.execute(f'''
CREATE TABLE test_table_2 (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
PRIMARY KEY (id)
);
''')

mysql.execute(f"INSERT INTO test_table_3 (name, age) VALUES ('Ivan', 42);", commit=True)
mysql.execute(f"INSERT INTO test_table_2 (name, age) VALUES ('Ivan', 42);", commit=True)

run_all_runner = RunAllRunner(cfg_file='tests_config_databases_tables.yaml')
run_all_runner.run()

assert_wait(lambda: 'test_db_2' in ch.get_databases())
assert 'test_db_3' not in ch.get_databases()

ch.execute_command('USE test_db_2')

assert_wait(lambda: 'test_table_2' in ch.get_tables())
assert_wait(lambda: len(ch.select('test_table_2')) == 1)

assert 'test_table_3' not in ch.get_tables()
19 changes: 19 additions & 0 deletions tests_config_databases_tables.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

mysql:
host: 'localhost'
port: 9306
user: 'root'
password: 'admin'

clickhouse:
host: 'localhost'
port: 9123
user: 'default'
password: 'admin'

binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000

databases: ['test_db_1*', 'test_db_2']
tables: ['test_table_1*', 'test_table_2']
Loading