Skip to content

Commit 36c7e47

Browse files
committed
Write exceptions to log files
1 parent d3ff89e commit 36c7e47

File tree

2 files changed

+40
-22
lines changed

2 files changed

+40
-22
lines changed

mysql_ch_replicator/binlog_replicator.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,11 +385,22 @@ def run(self):
385385

386386
killer = GracefulKiller()
387387

388+
last_log_time = time.time()
389+
total_processed_events = 0
390+
388391
while not killer.kill_now:
389392
try:
393+
curr_time = time.time()
394+
if curr_time - last_log_time > 60:
395+
last_log_time = curr_time
396+
logger.info(
397+
f'last transaction id: {last_transaction_id}, processed events: {total_processed_events}',
398+
)
399+
390400
last_read_count = 0
391401
for event in self.stream:
392402
last_read_count += 1
403+
total_processed_events += 1
393404
transaction_id = (self.stream.log_file, self.stream.log_pos)
394405
last_transaction_id = transaction_id
395406

@@ -457,8 +468,11 @@ def run(self):
457468
time.sleep(BinlogReplicator.READ_LOG_INTERVAL)
458469

459470
except OperationalError as e:
460-
print('=== operational error', e)
471+
logger.error(f'operational error {str(e)}', exc_info=True)
461472
time.sleep(15)
473+
except Exception:
474+
logger.error(f'unhandled error {str(e)}', exc_info=True)
475+
raise
462476

463477
logger.info('stopping binlog_replicator')
464478
self.data_writer.close_all()

mysql_ch_replicator/db_replicator.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -125,28 +125,32 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
125125
self.last_touch_time = 0
126126

127127
def run(self):
128-
logger.info('launched db_replicator')
129-
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
130-
self.run_realtime_replication()
131-
return
132-
if self.state.status == Status.PERFORMING_INITIAL_REPLICATION:
128+
try:
129+
logger.info('launched db_replicator')
130+
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
131+
self.run_realtime_replication()
132+
return
133+
if self.state.status == Status.PERFORMING_INITIAL_REPLICATION:
134+
self.perform_initial_replication()
135+
self.run_realtime_replication()
136+
return
137+
138+
logger.info('recreating database')
139+
self.clickhouse_api.database = self.target_database_tmp
140+
self.clickhouse_api.recreate_database()
141+
self.state.tables = self.mysql_api.get_tables()
142+
self.state.tables = [
143+
table for table in self.state.tables if self.config.is_table_matches(table)
144+
]
145+
self.state.last_processed_transaction = self.data_reader.get_last_transaction_id()
146+
self.state.save()
147+
logger.info(f'last known transaction {self.state.last_processed_transaction}')
148+
self.create_initial_structure()
133149
self.perform_initial_replication()
134150
self.run_realtime_replication()
135-
return
136-
137-
logger.info('recreating database')
138-
self.clickhouse_api.database = self.target_database_tmp
139-
self.clickhouse_api.recreate_database()
140-
self.state.tables = self.mysql_api.get_tables()
141-
self.state.tables = [
142-
table for table in self.state.tables if self.config.is_table_matches(table)
143-
]
144-
self.state.last_processed_transaction = self.data_reader.get_last_transaction_id()
145-
self.state.save()
146-
logger.info(f'last known transaction {self.state.last_processed_transaction}')
147-
self.create_initial_structure()
148-
self.perform_initial_replication()
149-
self.run_realtime_replication()
151+
except Exception:
152+
logger.error(f'unhandled exception', exc_info=True)
153+
raise
150154

151155
def create_initial_structure(self):
152156
self.state.status = Status.CREATING_INITIAL_STRUCTURES
@@ -417,7 +421,7 @@ def log_stats_if_required(self):
417421
if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL:
418422
return
419423
self.last_dump_stats_time = curr_time
420-
logger.info(f'statistics:\n{json.dumps(self.stats.__dict__)}')
424+
logger.info(f'stats: {json.dumps(self.stats.__dict__)}')
421425
self.stats = Statistics()
422426

423427
def upload_records_if_required(self, table_name):

0 commit comments

Comments
 (0)