Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class Statistics:
insert_records_count: int = 0
erase_events_count: int = 0
erase_records_count: int = 0
no_events_count: int = 0


class DbReplicator:
Expand Down Expand Up @@ -124,6 +125,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
self.last_touch_time = 0

def run(self):
logger.info('launched db_replicator')
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
self.run_realtime_replication()
return
Expand Down Expand Up @@ -226,6 +228,9 @@ def perform_initial_replication_table(self, table_name):
primary_key_index = field_names.index(primary_key)
primary_key_type = field_types[primary_key_index]

stats_number_of_records = 0
last_stats_dump_time = time.time()

while True:

query_start_value = max_primary_key
Expand Down Expand Up @@ -258,6 +263,14 @@ def perform_initial_replication_table(self, table_name):
self.save_state_if_required()
self.prevent_binlog_removal()

stats_number_of_records += len(records)
curr_time = time.time()
if curr_time - last_stats_dump_time >= 60.0:
last_stats_dump_time = curr_time
logger.info(
f'replicating {table_name}, replicated {stats_number_of_records}, primary key: {max_primary_key}',
)

def run_realtime_replication(self):
if self.initial_only:
logger.info('skip running realtime replication, only initial replication was requested')
Expand All @@ -277,6 +290,8 @@ def run_realtime_replication(self):
if event is None:
time.sleep(DbReplicator.READ_LOG_INTERVAL)
self.upload_records_if_required(table_name=None)
self.stats.no_events_count += 1
self.log_stats_if_required()
continue
assert event.db_name == self.database
if self.database != self.target_database:
Expand Down Expand Up @@ -402,7 +417,7 @@ def log_stats_if_required(self):
if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL:
return
self.last_dump_stats_time = curr_time
logger.info(f'statistics:\n{json.dumps(self.stats.__dict__, indent=3)}')
logger.info(f'statistics:\n{json.dumps(self.stats.__dict__)}')
self.stats = Statistics()

def upload_records_if_required(self, table_name):
Expand Down
52 changes: 48 additions & 4 deletions mysql_ch_replicator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import argparse
import logging
from logging.handlers import RotatingFileHandler
import sys
import os

from .config import Settings
from .db_replicator import DbReplicator
Expand All @@ -10,15 +13,38 @@
from .runner import Runner


def set_logging_config(tags):
def set_logging_config(tags, log_file=None):

handlers = []
handlers.append(logging.StreamHandler(sys.stderr))
if log_file is not None:
handlers.append(
RotatingFileHandler(
filename=log_file,
maxBytes=50*1024*1024, # 50 Mb
backupCount=3,
encoding='utf-8',
delay=False,
)
)

logging.basicConfig(
level=logging.INFO,
format=f'[{tags} %(asctime)s %(levelname)8s] %(message)s',
handlers=handlers,
)


def run_binlog_replicator(args, config: Settings):
set_logging_config('binlogrepl')
if not os.path.exists(config.binlog_replicator.data_dir):
os.mkdir(config.binlog_replicator.data_dir)

log_file = os.path.join(
config.binlog_replicator.data_dir,
'binlog_replicator.log',
)

set_logging_config('binlogrepl', log_file=log_file)
binlog_replicator = BinlogReplicator(
settings=config,
)
Expand All @@ -29,11 +55,29 @@ def run_db_replicator(args, config: Settings):
if not args.db:
raise Exception("need to pass --db argument")

set_logging_config(f'dbrepl {args.db}')
db_name = args.db

if not os.path.exists(config.binlog_replicator.data_dir):
os.mkdir(config.binlog_replicator.data_dir)

db_dir = os.path.join(
config.binlog_replicator.data_dir,
db_name,
)

if not os.path.exists(db_dir):
os.mkdir(db_dir)

log_file = os.path.join(
db_dir,
'db_replicator.log',
)

set_logging_config(f'dbrepl {args.db}', log_file=log_file)

db_replicator = DbReplicator(
config=config,
database=args.db,
database=db_name,
target_database=getattr(args, 'target_db', None),
initial_only=args.initial_only,
)
Expand Down
Loading