Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions mysql_ch_replicator/binlog_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from .pymysqlreplication.event import QueryEvent

from .config import MysqlSettings, BinlogReplicatorSettings
from .config import Settings, BinlogReplicatorSettings
from .utils import GracefulKiller


Expand Down Expand Up @@ -340,17 +340,18 @@ class BinlogReplicator:
BINLOG_RETENTION_PERIOD = 12 * 60 * 60
READ_LOG_INTERVAL = 1

def __init__(self, mysql_settings: MysqlSettings, replicator_settings: BinlogReplicatorSettings):
self.mysql_settings = mysql_settings
self.replicator_settings = replicator_settings
def __init__(self, settings: Settings):
self.settings = settings
self.mysql_settings = settings.mysql
self.replicator_settings = settings.binlog_replicator
mysql_settings = {
'host': mysql_settings.host,
'port': mysql_settings.port,
'user': mysql_settings.user,
'passwd': mysql_settings.password,
'host': self.mysql_settings.host,
'port': self.mysql_settings.port,
'user': self.mysql_settings.user,
'passwd': self.mysql_settings.password,
}
self.data_writer = DataWriter(self.replicator_settings)
self.state = State(os.path.join(replicator_settings.data_dir, 'state.json'))
self.state = State(os.path.join(self.replicator_settings.data_dir, 'state.json'))
logger.info(f'state start position: {self.state.prev_last_seen_transaction}')

log_file, log_pos = None, None
Expand Down Expand Up @@ -401,9 +402,13 @@ def run(self):
if hasattr(event, 'table'):
log_event.table_name = event.table
log_event.db_name = event.schema

if isinstance(log_event.db_name, bytes):
log_event.db_name = log_event.db_name.decode('utf-8')

if not self.settings.is_database_matches(log_event.db_name):
continue

log_event.transaction_id = transaction_id
if isinstance(event, UpdateRowsEvent) or isinstance(event, WriteRowsEvent):
log_event.event_type = EventType.ADD_EVENT.value
Expand Down
4 changes: 4 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import yaml
import fnmatch

from dataclasses import dataclass

Expand Down Expand Up @@ -44,3 +45,6 @@ def load(self, settings_file):
self.databases = data['databases']
assert isinstance(self.databases, str)
self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator'])

def is_database_matches(self, db_name):
return fnmatch.fnmatch(db_name, self.databases)
3 changes: 1 addition & 2 deletions mysql_ch_replicator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ def set_logging_config(tags):
def run_binlog_replicator(args, config: Settings):
set_logging_config('binlogrepl')
binlog_replicator = BinlogReplicator(
mysql_settings=config.mysql,
replicator_settings=config.binlog_replicator,
settings=config,
)
binlog_replicator.run()

Expand Down
3 changes: 1 addition & 2 deletions mysql_ch_replicator/runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import time
import sys
import fnmatch

from logging import getLogger

Expand Down Expand Up @@ -59,7 +58,7 @@ def run(self):
database=None, mysql_settings=self.config.mysql,
)
databases = mysql_api.get_databases()
databases = [db for db in databases if fnmatch.fnmatch(db, self.databases)]
databases = [db for db in databases if self.config.is_database_matches(db)]

killer = GracefulKiller()

Expand Down
2 changes: 1 addition & 1 deletion tests_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000

databases: 'database_name_pattern_*'
databases: '*test*'
Loading