@@ -82,6 +82,7 @@ class Statistics:
8282 insert_records_count : int = 0
8383 erase_events_count : int = 0
8484 erase_records_count : int = 0
85+ no_events_count : int = 0
8586
8687
8788class DbReplicator :
@@ -124,6 +125,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
124125 self .last_touch_time = 0
125126
126127 def run (self ):
128+ logger .info ('launched db_replicator' )
127129 if self .state .status == Status .RUNNING_REALTIME_REPLICATION :
128130 self .run_realtime_replication ()
129131 return
@@ -226,6 +228,9 @@ def perform_initial_replication_table(self, table_name):
226228 primary_key_index = field_names .index (primary_key )
227229 primary_key_type = field_types [primary_key_index ]
228230
231+ stats_number_of_records = 0
232+ last_stats_dump_time = time .time ()
233+
229234 while True :
230235
231236 query_start_value = max_primary_key
@@ -258,6 +263,14 @@ def perform_initial_replication_table(self, table_name):
258263 self .save_state_if_required ()
259264 self .prevent_binlog_removal ()
260265
266+ stats_number_of_records += len (records )
267+ curr_time = time .time ()
268+ if curr_time - last_stats_dump_time >= 60.0 :
269+ last_stats_dump_time = curr_time
270+ logger .info (
271+ f'replicating { table_name } , replicated { stats_number_of_records } , primary key: { max_primary_key } ' ,
272+ )
273+
261274 def run_realtime_replication (self ):
262275 if self .initial_only :
263276 logger .info ('skip running realtime replication, only initial replication was requested' )
@@ -277,6 +290,8 @@ def run_realtime_replication(self):
277290 if event is None :
278291 time .sleep (DbReplicator .READ_LOG_INTERVAL )
279292 self .upload_records_if_required (table_name = None )
293+ self .stats .no_events_count += 1
294+ self .log_stats_if_required ()
280295 continue
281296 assert event .db_name == self .database
282297 if self .database != self .target_database :
@@ -402,7 +417,7 @@ def log_stats_if_required(self):
402417 if curr_time - self .last_dump_stats_time < DbReplicator .STATS_DUMP_INTERVAL :
403418 return
404419 self .last_dump_stats_time = curr_time
405- logger .info (f'statistics:\n { json .dumps (self .stats .__dict__ , indent = 3 )} ' )
420+ logger .info (f'statistics:\n { json .dumps (self .stats .__dict__ )} ' )
406421 self .stats = Statistics ()
407422
408423 def upload_records_if_required (self , table_name ):
0 commit comments