Skip to content

Commit bf36075

Browse files
authored
Added debug log level, and more detailed logs (bakwc#18)
1 parent 9b97f18 commit bf36075

File tree

7 files changed

+87
-11
lines changed

7 files changed

+87
-11
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,17 @@ clickhouse:
112112
port: 8323
113113
user: 'default'
114114
password: 'default'
115+
connection_timeout: 30 # optional
116+
send_receive_timeout: 300 # optional
115117

116118
binlog_replicator:
117119
data_dir: '/home/user/binlog/'
118120
records_per_file: 100000
119121

120122
databases: 'database_name_pattern_*'
121123
tables: '*'
124+
125+
log_level: 'info' # optional
122126
```
123127
124128
@@ -127,6 +131,7 @@ tables: '*'
127131
- `binlog_replicator.data_dir` Create a new empty directory, it will be used by script to store it's state
128132
- `databases` Databases name pattern to replicate, e.g. `db_*` will match `db_1` `db_2` `db_test`, list is also supported
129133
- `tables` (__optional__) - tables to filter, list is also supported
134+
- `log_level` (__optional__) - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
130135

131136
Few more tables / dbs examples:
132137

mysql_ch_replicator/binlog_replicator.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,8 @@ def run(self):
406406

407407
self.update_state_if_required(transaction_id)
408408

409+
logger.debug(f'received event {type(event)}, {transaction_id}')
410+
409411
if type(event) not in (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent, QueryEvent):
410412
continue
411413

@@ -428,6 +430,8 @@ def run(self):
428430
if not self.settings.is_database_matches(log_event.db_name):
429431
continue
430432

433+
logger.debug(f'event matched {transaction_id}, {log_event.db_name}, {log_event.table_name}')
434+
431435
log_event.transaction_id = transaction_id
432436
if isinstance(event, UpdateRowsEvent) or isinstance(event, WriteRowsEvent):
433437
log_event.event_type = EventType.ADD_EVENT.value
@@ -459,6 +463,16 @@ def run(self):
459463
vals = list(vals.values())
460464
log_event.records.append(vals)
461465

466+
if self.settings.debug_log_level:
467+
# records serialization is heavy, only do it with debug log enabled
468+
logger.debug(
469+
f'store event {transaction_id}, '
470+
f'event type: {log_event.event_type}, '
471+
f'database: {log_event.db_name} '
472+
f'table: {log_event.table_name} '
473+
f'records: {log_event.records}',
474+
)
475+
462476
self.data_writer.store_event(log_event)
463477

464478
self.update_state_if_required(last_transaction_id)

mysql_ch_replicator/config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ def __init__(self):
8989
self.databases = ''
9090
self.tables = '*'
9191
self.settings_file = ''
92+
self.log_level = 'info'
93+
self.debug_log_level = False
9294

9395
def load(self, settings_file):
9496
data = open(settings_file, 'r').read()
@@ -99,6 +101,7 @@ def load(self, settings_file):
99101
self.clickhouse = ClickhouseSettings(**data['clickhouse'])
100102
self.databases = data['databases']
101103
self.tables = data.get('tables', '*')
104+
self.log_level = data.get('log_level', 'info')
102105
assert isinstance(self.databases, str) or isinstance(self.databases, list)
103106
assert isinstance(self.tables, str) or isinstance(self.tables, list)
104107
self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator'])
@@ -123,7 +126,14 @@ def is_database_matches(self, db_name):
123126
def is_table_matches(self, table_name):
124127
return self.is_pattern_matches(table_name, self.tables)
125128

129+
def validate_log_level(self):
130+
if self.log_level not in ['critical', 'error', 'warning', 'info', 'debug']:
131+
raise ValueError(f'wrong log level {self.log_level}')
132+
if self.log_level == 'debug':
133+
self.debug_log_level = True
134+
126135
def validate(self):
127136
self.mysql.validate()
128137
self.clickhouse.validate()
129138
self.binlog_replicator.validate()
139+
self.validate_log_level()

mysql_ch_replicator/db_replicator.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ def perform_initial_replication_table(self, table_name):
255255
primary_key_index = field_names.index(primary_key)
256256
primary_key_type = field_types[primary_key_index]
257257

258+
logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}')
259+
258260
stats_number_of_records = 0
259261
last_stats_dump_time = time.time()
260262

@@ -270,11 +272,12 @@ def perform_initial_replication_table(self, table_name):
270272
limit=DbReplicator.INITIAL_REPLICATION_BATCH_SIZE,
271273
start_value=query_start_value,
272274
)
275+
logger.debug(f'extracted {len(records)} records from mysql')
273276

274277
records = self.converter.convert_records(records, mysql_table_structure, clickhouse_table_structure)
275278

276-
# for record in records:
277-
# print(dict(zip(field_names, record)))
279+
if self.config.debug_log_level:
280+
logger.debug(f'records: {records}')
278281

279282
if not records:
280283
break
@@ -295,9 +298,17 @@ def perform_initial_replication_table(self, table_name):
295298
if curr_time - last_stats_dump_time >= 60.0:
296299
last_stats_dump_time = curr_time
297300
logger.info(
298-
f'replicating {table_name}, replicated {stats_number_of_records}, primary key: {max_primary_key}',
301+
f'replicating {table_name}, '
302+
f'replicated {stats_number_of_records} records, '
303+
f'primary key: {max_primary_key}',
299304
)
300305

306+
logger.info(
307+
f'finish replicating {table_name}, '
308+
f'replicated {stats_number_of_records} records, '
309+
f'primary key: {max_primary_key}',
310+
)
311+
301312
def run_realtime_replication(self):
302313
if self.initial_only:
303314
logger.info('skip running realtime replication, only initial replication was requested')
@@ -337,7 +348,7 @@ def handle_event(self, event: LogEvent):
337348
if event.transaction_id <= self.state.last_processed_transaction_non_uploaded:
338349
return
339350

340-
logger.debug(f'processing event {event.transaction_id}')
351+
logger.debug(f'processing event {event.transaction_id}, {event.event_type}, {event.table_name}')
341352

342353
event_handlers = {
343354
EventType.ADD_EVENT.value: self.handle_insert_event,
@@ -366,6 +377,12 @@ def save_state_if_required(self, force=False):
366377
self.state.save()
367378

368379
def handle_insert_event(self, event: LogEvent):
380+
if self.config.debug_log_level:
381+
logger.debug(
382+
f'processing insert event: {event.transaction_id}, '
383+
f'table: {event.table_name}, '
384+
f'records: {event.records}',
385+
)
369386
self.stats.insert_events_count += 1
370387
self.stats.insert_records_count += len(event.records)
371388

@@ -383,6 +400,12 @@ def handle_insert_event(self, event: LogEvent):
383400
current_table_records_to_delete.discard(record_id)
384401

385402
def handle_erase_event(self, event: LogEvent):
403+
if self.config.debug_log_level:
404+
logger.debug(
405+
f'processing erase event: {event.transaction_id}, '
406+
f'table: {event.table_name}, '
407+
f'records: {event.records}',
408+
)
386409
self.stats.erase_events_count += 1
387410
self.stats.erase_records_count += len(event.records)
388411

@@ -404,7 +427,8 @@ def handle_erase_event(self, event: LogEvent):
404427
current_table_records_to_insert.pop(record_id, None)
405428

406429
def handle_query_event(self, event: LogEvent):
407-
#print(" === handle_query_event", event.records)
430+
if self.config.debug_log_level:
431+
logger.debug(f'processing query event: {event.transaction_id}, query: {event.records}')
408432
query = strip_sql_comments(event.records)
409433
if query.lower().startswith('alter'):
410434
self.handle_alter_query(query, event.db_name)
@@ -476,20 +500,27 @@ def upload_records_if_required(self, table_name):
476500
self.upload_records()
477501

478502
def upload_records(self):
503+
logger.debug(
504+
f'upload records, to insert: {len(self.records_to_insert)}, to delete: {len(self.records_to_delete)}',
505+
)
479506
self.last_records_upload_time = time.time()
480507

481508
for table_name, id_to_records in self.records_to_insert.items():
482509
records = id_to_records.values()
483510
if not records:
484511
continue
485512
_, ch_table_structure = self.state.tables_structure[table_name]
513+
if self.config.debug_log_level:
514+
logger.debug(f'inserting into {table_name}, records: {records}')
486515
self.clickhouse_api.insert(table_name, records, table_structure=ch_table_structure)
487516

488517
for table_name, keys_to_remove in self.records_to_delete.items():
489518
if not keys_to_remove:
490519
continue
491520
table_structure: TableStructure = self.state.tables_structure[table_name][0]
492521
primary_key_name = table_structure.primary_key
522+
if self.config.debug_log_level:
523+
logger.debug(f'erasing from {table_name}, primary key: {primary_key_name}, values: {keys_to_remove}')
493524
self.clickhouse_api.erase(
494525
table_name=table_name,
495526
field_name=primary_key_name,

mysql_ch_replicator/main.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from .runner import Runner
1414

1515

16-
def set_logging_config(tags, log_file=None):
16+
def set_logging_config(tags, log_file=None, log_level_str=None):
1717

1818
handlers = []
1919
handlers.append(logging.StreamHandler(sys.stderr))
@@ -28,8 +28,21 @@ def set_logging_config(tags, log_file=None):
2828
)
2929
)
3030

31+
log_levels = {
32+
'critical': logging.CRITICAL,
33+
'error': logging.ERROR,
34+
'warning': logging.WARNING,
35+
'info': logging.INFO,
36+
'debug': logging.DEBUG,
37+
}
38+
39+
log_level = log_levels.get(log_level_str)
40+
if log_level is None:
41+
print(f'[warning] unknown log level {log_level_str}, setting info')
42+
log_level = 'info'
43+
3144
logging.basicConfig(
32-
level=logging.INFO,
45+
level=log_level,
3346
format=f'[{tags} %(asctime)s %(levelname)8s] %(message)s',
3447
handlers=handlers,
3548
)
@@ -44,7 +57,7 @@ def run_binlog_replicator(args, config: Settings):
4457
'binlog_replicator.log',
4558
)
4659

47-
set_logging_config('binlogrepl', log_file=log_file)
60+
set_logging_config('binlogrepl', log_file=log_file, log_level_str=config.log_level)
4861
binlog_replicator = BinlogReplicator(
4962
settings=config,
5063
)
@@ -73,7 +86,7 @@ def run_db_replicator(args, config: Settings):
7386
'db_replicator.log',
7487
)
7588

76-
set_logging_config(f'dbrepl {args.db}', log_file=log_file)
89+
set_logging_config(f'dbrepl {args.db}', log_file=log_file, log_level_str=config.log_level)
7790

7891
db_replicator = DbReplicator(
7992
config=config,
@@ -85,13 +98,13 @@ def run_db_replicator(args, config: Settings):
8598

8699

87100
def run_monitoring(args, config: Settings):
88-
set_logging_config('monitor')
101+
set_logging_config('monitor', log_level_str=config.log_level)
89102
monitoring = Monitoring(args.db or '', config)
90103
monitoring.run()
91104

92105

93106
def run_all(args, config: Settings):
94-
set_logging_config('runner')
107+
set_logging_config('runner', log_level_str=config.log_level)
95108
runner = Runner(config, args.wait_initial_replication, args.db)
96109
runner.run()
97110

tests_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ binlog_replicator:
1616
records_per_file: 100000
1717

1818
databases: '*test*'
19+
log_level: 'debug'

tests_config_databases_tables.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ binlog_replicator:
1717

1818
databases: ['test_db_1*', 'test_db_2']
1919
tables: ['test_table_1*', 'test_table_2']
20+
21+
log_level: 'debug'

0 commit comments

Comments
 (0)