Skip to content

Commit d6d704f

Browse files
committed
Fixed running initial replication when switched DB
1 parent 7141b73 commit d6d704f

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

mysql_ch_replicator/db_replicator.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ def save(self):
7373
f.write(data)
7474
os.rename(file_name + '.tmp', file_name)
7575

76+
def remove(self):
77+
file_name = self.file_name
78+
if os.path.exists(file_name):
79+
os.remove(file_name)
80+
if os.path.exists(file_name + '.tmp'):
81+
os.remove(file_name + '.tmp')
82+
7683

7784
@dataclass
7885
class Statistics:
@@ -115,7 +122,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
115122
)
116123
self.converter = MysqlToClickhouseConverter(self)
117124
self.data_reader = DataReader(config.binlog_replicator, database)
118-
self.state = State(os.path.join(config.binlog_replicator.data_dir, database, 'state.pckl'))
125+
self.state = self.create_state()
119126
self.clickhouse_api.tables_last_record_version = self.state.tables_last_record_version
120127
self.last_save_state_time = 0
121128
self.stats = Statistics()
@@ -126,9 +133,22 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
126133
self.last_records_upload_time = 0
127134
self.last_touch_time = 0
128135

136+
def create_state(self):
137+
return State(os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl'))
138+
129139
def run(self):
130140
try:
131141
logger.info('launched db_replicator')
142+
143+
if self.state.status != Status.NONE:
144+
# ensure target database still exists
145+
if self.target_database not in self.clickhouse_api.get_databases():
146+
logger.warning(f'database {self.target_database} missing in CH')
147+
if self.initial_only:
148+
logger.warning('will run replication from scratch')
149+
self.state.remove()
150+
self.state = self.create_state()
151+
132152
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
133153
self.run_realtime_replication()
134154
return
@@ -213,6 +233,7 @@ def perform_initial_replication_table(self, table_name):
213233

214234
if not self.config.is_table_matches(table_name):
215235
logger.info(f'skip table {table_name} - not matching any allowed table')
236+
return
216237

217238
max_primary_key = None
218239
if self.state.initial_replication_table == table_name:
@@ -280,6 +301,7 @@ def perform_initial_replication_table(self, table_name):
280301
def run_realtime_replication(self):
281302
if self.initial_only:
282303
logger.info('skip running realtime replication, only initial replication was requested')
304+
self.state.remove()
283305
return
284306

285307
self.mysql_api.close()

test_mysql_ch_replicator.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,13 @@ def test_initial_only():
382382
assert TEST_TABLE_NAME in ch.get_tables()
383383
assert len(ch.select(TEST_TABLE_NAME)) == 2
384384

385+
ch.execute_command(f'DROP DATABASE {TEST_DB_NAME}')
386+
387+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, additional_arguments='--initial_only=True')
388+
db_replicator_runner.run()
389+
db_replicator_runner.wait_complete()
390+
assert TEST_DB_NAME in ch.get_databases()
391+
385392

386393
def test_database_tables_filtering():
387394
cfg = config.Settings()

0 commit comments

Comments
 (0)