@@ -99,8 +99,8 @@ def _check_versions_replication_support(self):
9999 LOGGER .info ("Checking MySQL versions for replication support" )
100100
101101 if (
102- LooseVersion ("5.7.0" ) <= LooseVersion (self .source .version ) < LooseVersion ("8.1 " )
103- and LooseVersion ("8.0.0" ) <= LooseVersion (self .target .version ) < LooseVersion ("8.1 " )
102+ LooseVersion ("5.7.0" ) <= LooseVersion (self .source .version ) < LooseVersion ("8.5 " )
103+ and LooseVersion ("8.0.0" ) <= LooseVersion (self .target .version ) < LooseVersion ("8.5 " )
104104 ):
105105 LOGGER .info ("\t Source - %s, target - %s -- OK" , self .source .version , self .target .version )
106106 else :
@@ -142,11 +142,17 @@ def _check_gtid_mode_enabled(self):
142142 gtid_mode = select_global_var (cur , "gtid_mode" )
143143 if gtid_mode .upper () != "ON" :
144144 raise GTIDModeDisabledException (f"GTID mode should be enabled on the { conn_info .name } " )
145- cur .execute ("SHOW MASTER STATUS" )
145+
146+ if conn_info .version >= LooseVersion ("8.2.0" ):
147+ show_status_query = "SHOW BINARY LOG STATUS"
148+ else :
149+ show_status_query = "SHOW MASTER STATUS"
150+ cur .execute (show_status_query )
151+
146152 master_status = cur .fetchone ()
147153 if master_status is None :
148154 raise GTIDModeDisabledException (
149- f"GTID mode should be enabled on the { conn_info .name } : SHOW MASTER STATUS is empty"
155+ f"GTID mode should be enabled on the { conn_info .name } : { show_status_query } is empty"
150156 )
151157 executed_gtid_set = master_status .get ("Executed_Gtid_Set" , None )
152158 if not executed_gtid_set :
@@ -157,7 +163,9 @@ def _check_gtid_mode_enabled(self):
157163 def _check_user_can_replicate (self ):
158164 LOGGER .info ("Checking if user has replication grants on the source" )
159165
160- user_can_replicate = any (grant in self .source .global_grants for grant in ("REPLICATION SLAVE" , "ALL PRIVILEGES" ))
166+ user_can_replicate = any (
167+ grant in self .source .global_grants for grant in ("REPLICATION SLAVE" , "ALL PRIVILEGES" , "REPLICATION CLIENT" )
168+ )
161169 if not user_can_replicate :
162170 raise MissingReplicationGrants ("User does not have replication permissions" )
163171
@@ -277,17 +285,17 @@ def run_checks(
277285
278286 return migration_method
279287
280- def _stop_and_reset_slave (self ):
288+ def _stop_and_reset_replica (self ):
281289 LOGGER .info ("Stopping replication on target database" )
282290
283291 with self .target_master .cur () as cur :
284- cur .execute ("STOP SLAVE " )
285- cur .execute ("RESET SLAVE ALL" )
292+ cur .execute ("STOP REPLICA " )
293+ cur .execute ("RESET REPLICA ALL" )
286294
287295 def _stop_replication (self ):
288296 LOGGER .info ("Stopping replication" )
289297
290- self ._stop_and_reset_slave ()
298+ self ._stop_and_reset_replica ()
291299
292300 def _migrate_data (self , migration_method : MySQLMigrateMethod ) -> Optional [str ]:
293301 """Migrate data using the configured dump tool, return GTID from the dump"""
@@ -323,9 +331,9 @@ def _start_replication(self):
323331
324332 with self .target_master .cur () as cur :
325333 query = (
326- "CHANGE MASTER TO MASTER_HOST = %s, MASTER_PORT = %s, MASTER_USER = %s, MASTER_PASSWORD = %s, "
327- f"MASTER_AUTO_POSITION = 1, MASTER_SSL = { 1 if self .source .ssl else 0 } , "
328- "MASTER_SSL_VERIFY_SERVER_CERT = 0, MASTER_SSL_CA = '', MASTER_SSL_CAPATH = ''"
334+ "CHANGE REPLICATION SOURCE TO SOURCE_HOST = %s, SOURCE_PORT = %s, SOURCE_USER = %s, SOURCE_PASSWORD = %s, "
335+ f"SOURCE_AUTO_POSITION = 1, SOURCE_SSL = { 1 if self .source .ssl else 0 } , "
336+ "SOURCE_SSL_VERIFY_SERVER_CERT = 0, SOURCE_SSL_CA = '', SOURCE_SSL_CAPATH = ''"
329337 )
330338 if LooseVersion (self .target .version ) >= LooseVersion ("8.0.19" ):
331339 query += ", REQUIRE_ROW_FORMAT = 1"
@@ -351,27 +359,27 @@ def _start_replication(self):
351359 f"CHANGE REPLICATION FILTER REPLICATE_WILD_IGNORE_TABLE = ({ ', ' .join ('%s' for _ in self .ignore_dbs )} )" ,
352360 [f"{ db } .%" for db in self .ignore_dbs ]
353361 )
354- cur .execute ("START SLAVE " )
362+ cur .execute ("START REPLICA " )
355363
356364 def _ensure_target_replica_running (self , check_interval : float = 2.0 , retries : int = 30 ):
357365 LOGGER .info ("Ensure replica is running" )
358366
359367 with self .target .cur () as cur :
360368 for _ in range (retries ):
361- cur .execute ("SHOW SLAVE STATUS" )
369+ cur .execute ("SHOW REPLICA STATUS" )
362370 rows = cur .fetchall ()
363371 if not rows :
364- raise ReplicaSetupException ("SHOW SLAVE STATUS didn't return any rows" )
372+ raise ReplicaSetupException ("SHOW REPLICA STATUS didn't return any rows" )
365373
366374 try :
367- slave_status = next (
375+ replica_status = next (
368376 row for row in rows
369- if row ["Master_Host " ] == self .source .hostname and row ["Master_Port " ] == self .source .port
377+ if row ["Source_Host " ] == self .source .hostname and row ["Source_Port " ] == self .source .port
370378 )
371379 except StopIteration as e :
372- raise ReplicaSetupException ("Replication didn't start, Master info not available" ) from e
380+ raise ReplicaSetupException ("Replication didn't start, Source info not available" ) from e
373381
374- if slave_status [ "Slave_IO_Running " ] == "Yes" and slave_status [ "Slave_SQL_Running " ] == "Yes" :
382+ if replica_status [ "Replica_IO_Running " ] == "Yes" and replica_status [ "Replica_SQL_Running " ] == "Yes" :
375383 return
376384
377385 time .sleep (check_interval )
@@ -384,22 +392,22 @@ def _wait_for_replication(self, *, seconds_behind_master: int = 0, check_interva
384392
385393 while True :
386394 with self .target .cur () as cur :
387- cur .execute ("SHOW SLAVE STATUS" )
395+ cur .execute ("SHOW REPLICA STATUS" )
388396 rows = cur .fetchall ()
389397 if not rows :
390- raise ReplicaSetupException ("SHOW SLAVE STATUS didn't return any rows" )
398+ raise ReplicaSetupException ("SHOW REPLICA STATUS didn't return any rows" )
391399
392400 try :
393- slave_status = next (
401+ replica_status = next (
394402 row for row in rows
395- if row ["Master_Host " ] == self .source .hostname and row ["Master_Port " ] == self .source .port
403+ if row ["Source_Host " ] == self .source .hostname and row ["Source_Port " ] == self .source .port
396404 )
397405 except StopIteration as e :
398- raise ReplicaSetupException ("Replication didn't catch up, Master info not available" ) from e
406+ raise ReplicaSetupException ("Replication didn't catch up, source info not available" ) from e
399407
400- lag = slave_status [ "Seconds_Behind_Master " ]
408+ lag = replica_status [ "Seconds_Behind_Source " ]
401409 if lag is None :
402- raise ReplicaSetupException ("Replication didn't catch up, Seconds_Behind_Master is null" )
410+ raise ReplicaSetupException ("Replication didn't catch up, Seconds_Behind_Source is null" )
403411
404412 LOGGER .info ("Current replication lag: %s seconds" , lag )
405413 if lag <= seconds_behind_master :
0 commit comments