@@ -124,6 +124,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
124124 self .last_touch_time = 0
125125
126126 def run (self ):
127+ logger .info ('launched db_replicator' )
127128 if self .state .status == Status .RUNNING_REALTIME_REPLICATION :
128129 self .run_realtime_replication ()
129130 return
@@ -226,6 +227,9 @@ def perform_initial_replication_table(self, table_name):
226227 primary_key_index = field_names .index (primary_key )
227228 primary_key_type = field_types [primary_key_index ]
228229
230+ stats_number_of_records = 0
231+ last_stats_dump_time = time .time ()
232+
229233 while True :
230234
231235 query_start_value = max_primary_key
@@ -258,6 +262,14 @@ def perform_initial_replication_table(self, table_name):
258262 self .save_state_if_required ()
259263 self .prevent_binlog_removal ()
260264
265+ stats_number_of_records += len (records )
266+ curr_time = time .time ()
267+ if curr_time - last_stats_dump_time >= 60.0 :
268+ last_stats_dump_time = curr_time
269+ logger .info (
270+ f'replicating { table_name } , replicated { stats_number_of_records } , primary key: { max_primary_key } ' ,
271+ )
272+
261273 def run_realtime_replication (self ):
262274 if self .initial_only :
263275 logger .info ('skip running realtime replication, only initial replication was requested' )
@@ -277,6 +289,7 @@ def run_realtime_replication(self):
277289 if event is None :
278290 time .sleep (DbReplicator .READ_LOG_INTERVAL )
279291 self .upload_records_if_required (table_name = None )
292+ self .log_stats_if_required ()
280293 continue
281294 assert event .db_name == self .database
282295 if self .database != self .target_database :
0 commit comments