@@ -73,6 +73,13 @@ def save(self):
7373 f .write (data )
7474 os .rename (file_name + '.tmp' , file_name )
7575
76+ def remove (self ):
77+ file_name = self .file_name
78+ if os .path .exists (file_name ):
79+ os .remove (file_name )
80+ if os .path .exists (file_name + '.tmp' ):
81+ os .remove (file_name + '.tmp' )
82+
7683
7784@dataclass
7885class Statistics :
@@ -115,7 +122,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
115122 )
116123 self .converter = MysqlToClickhouseConverter (self )
117124 self .data_reader = DataReader (config .binlog_replicator , database )
118- self .state = State ( os . path . join ( config . binlog_replicator . data_dir , database , 'state.pckl' ) )
125+ self .state = self . create_state ( )
119126 self .clickhouse_api .tables_last_record_version = self .state .tables_last_record_version
120127 self .last_save_state_time = 0
121128 self .stats = Statistics ()
@@ -126,9 +133,22 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
126133 self .last_records_upload_time = 0
127134 self .last_touch_time = 0
128135
136+ def create_state (self ):
137+ return State (os .path .join (self .config .binlog_replicator .data_dir , self .database , 'state.pckl' ))
138+
129139 def run (self ):
130140 try :
131141 logger .info ('launched db_replicator' )
142+
143+ if self .state .status != Status .NONE :
144+ # ensure target database still exists
145+ if self .target_database not in self .clickhouse_api .get_databases ():
146+ logger .warning (f'database { self .target_database } missing in CH' )
147+ if self .initial_only :
148+ logger .warning ('will run replication from scratch' )
149+ self .state .remove ()
150+ self .state = self .create_state ()
151+
132152 if self .state .status == Status .RUNNING_REALTIME_REPLICATION :
133153 self .run_realtime_replication ()
134154 return
@@ -213,6 +233,7 @@ def perform_initial_replication_table(self, table_name):
213233
214234 if not self .config .is_table_matches (table_name ):
215235 logger .info (f'skip table { table_name } - not matching any allowed table' )
236+ return
216237
217238 max_primary_key = None
218239 if self .state .initial_replication_table == table_name :
@@ -280,6 +301,7 @@ def perform_initial_replication_table(self, table_name):
280301 def run_realtime_replication (self ):
281302 if self .initial_only :
282303 logger .info ('skip running realtime replication, only initial replication was requested' )
304+ self .state .remove ()
283305 return
284306
285307 self .mysql_api .close ()
0 commit comments