|
20 | 20 | ) |
21 | 21 | from .pymysqlreplication.event import QueryEvent |
22 | 22 |
|
23 | | -from .config import MysqlSettings, BinlogReplicatorSettings |
| 23 | +from .config import Settings, BinlogReplicatorSettings |
24 | 24 | from .utils import GracefulKiller |
25 | 25 |
|
26 | 26 |
|
@@ -340,17 +340,18 @@ class BinlogReplicator: |
340 | 340 | BINLOG_RETENTION_PERIOD = 12 * 60 * 60 |
341 | 341 | READ_LOG_INTERVAL = 1 |
342 | 342 |
|
343 | | - def __init__(self, mysql_settings: MysqlSettings, replicator_settings: BinlogReplicatorSettings): |
344 | | - self.mysql_settings = mysql_settings |
345 | | - self.replicator_settings = replicator_settings |
| 343 | + def __init__(self, settings: Settings): |
| 344 | + self.settings = settings |
| 345 | + self.mysql_settings = settings.mysql |
| 346 | + self.replicator_settings = settings.binlog_replicator |
346 | 347 | mysql_settings = { |
347 | | - 'host': mysql_settings.host, |
348 | | - 'port': mysql_settings.port, |
349 | | - 'user': mysql_settings.user, |
350 | | - 'passwd': mysql_settings.password, |
| 348 | + 'host': self.mysql_settings.host, |
| 349 | + 'port': self.mysql_settings.port, |
| 350 | + 'user': self.mysql_settings.user, |
| 351 | + 'passwd': self.mysql_settings.password, |
351 | 352 | } |
352 | 353 | self.data_writer = DataWriter(self.replicator_settings) |
353 | | - self.state = State(os.path.join(replicator_settings.data_dir, 'state.json')) |
| 354 | + self.state = State(os.path.join(self.replicator_settings.data_dir, 'state.json')) |
354 | 355 | logger.info(f'state start position: {self.state.prev_last_seen_transaction}') |
355 | 356 |
|
356 | 357 | log_file, log_pos = None, None |
@@ -401,9 +402,13 @@ def run(self): |
401 | 402 | if hasattr(event, 'table'): |
402 | 403 | log_event.table_name = event.table |
403 | 404 | log_event.db_name = event.schema |
| 405 | + |
404 | 406 | if isinstance(log_event.db_name, bytes): |
405 | 407 | log_event.db_name = log_event.db_name.decode('utf-8') |
406 | 408 |
|
| 409 | + if not self.settings.is_database_matches(log_event.db_name): |
| 410 | + continue |
| 411 | + |
407 | 412 | log_event.transaction_id = transaction_id |
408 | 413 | if isinstance(event, UpdateRowsEvent) or isinstance(event, WriteRowsEvent): |
409 | 414 | log_event.event_type = EventType.ADD_EVENT.value |
|
0 commit comments