Skip to content

Commit 277dfb3

Browse files
committed
realtime replicator to separate file
1 parent 687df69 commit 277dfb3

File tree

2 files changed

+321
-310
lines changed

2 files changed

+321
-310
lines changed
Lines changed: 9 additions & 310 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,17 @@
1-
import json
21
import os.path
3-
import random
42
import time
53
import pickle
64
import hashlib
75
from logging import getLogger
8-
from enum import Enum
96
from dataclasses import dataclass
10-
from collections import defaultdict
11-
import sys
12-
import subprocess
13-
import select
147

15-
from .config import Settings, MysqlSettings, ClickhouseSettings
8+
from .config import Settings
169
from .mysql_api import MySQLApi
1710
from .clickhouse_api import ClickhouseApi
18-
from .converter import MysqlToClickhouseConverter, strip_sql_name, strip_sql_comments
19-
from .table_structure import TableStructure, TableField
20-
from .binlog_replicator import DataReader, LogEvent, EventType
21-
from .utils import GracefulKiller, touch_all_files, format_floats
11+
from .converter import MysqlToClickhouseConverter
12+
from .binlog_replicator import DataReader
2213
from .db_replicator_initial import DbReplicatorInitial
14+
from .db_replicator_realtime import DbReplicatorRealtime
2315
from .common import Status
2416

2517

@@ -95,16 +87,6 @@ def remove(self):
9587

9688

9789
class DbReplicator:
98-
99-
SAVE_STATE_INTERVAL = 10
100-
STATS_DUMP_INTERVAL = 60
101-
BINLOG_TOUCH_INTERVAL = 120
102-
103-
DATA_DUMP_INTERVAL = 1
104-
DATA_DUMP_BATCH_SIZE = 100000
105-
106-
READ_LOG_INTERVAL = 0.3
107-
10890
def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False,
10991
worker_id: int = None, total_workers: int = None, table: str = None):
11092
self.config = config
@@ -174,18 +156,14 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
174156
self.data_reader = DataReader(config.binlog_replicator, database)
175157
self.state = self.create_state()
176158
self.clickhouse_api.tables_last_record_version = self.state.tables_last_record_version
177-
self.last_save_state_time = 0
178159
self.stats = Statistics()
179-
self.last_dump_stats_time = 0
180-
self.last_dump_stats_process_time = 0
181-
self.records_to_insert = defaultdict(dict) # table_name => {record_id=>record, ...}
182-
self.records_to_delete = defaultdict(set) # table_name => {record_id, ...}
183-
self.last_records_upload_time = 0
184-
self.last_touch_time = 0
185160
self.start_time = time.time()
186161

187162
# Create the initial replicator instance
188163
self.initial_replicator = DbReplicatorInitial(self)
164+
165+
# Create the realtime replicator instance
166+
self.realtime_replicator = DbReplicatorRealtime(self)
189167

190168
def create_state(self):
191169
return State(self.state_path)
@@ -241,285 +219,6 @@ def run(self):
241219
logger.error(f'unhandled exception', exc_info=True)
242220
raise
243221

244-
def prevent_binlog_removal(self):
245-
if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL:
246-
return
247-
binlog_directory = os.path.join(self.config.binlog_replicator.data_dir, self.database)
248-
logger.info(f'touch binlog {binlog_directory}')
249-
if not os.path.exists(binlog_directory):
250-
return
251-
self.last_touch_time = time.time()
252-
touch_all_files(binlog_directory)
253-
254222
def run_realtime_replication(self):
255-
if self.initial_only:
256-
logger.info('skip running realtime replication, only initial replication was requested')
257-
self.state.remove()
258-
return
259-
260-
self.mysql_api.close()
261-
self.mysql_api = None
262-
logger.info(f'running realtime replication from the position: {self.state.last_processed_transaction}')
263-
self.state.status = Status.RUNNING_REALTIME_REPLICATION
264-
self.state.save()
265-
self.data_reader.set_position(self.state.last_processed_transaction)
266-
267-
killer = GracefulKiller()
268-
269-
while not killer.kill_now:
270-
if self.config.auto_restart_interval:
271-
curr_time = time.time()
272-
if curr_time - self.start_time >= self.config.auto_restart_interval:
273-
logger.info('process restart (check auto_restart_interval config option)')
274-
break
275-
276-
event = self.data_reader.read_next_event()
277-
if event is None:
278-
time.sleep(DbReplicator.READ_LOG_INTERVAL)
279-
self.upload_records_if_required(table_name=None)
280-
self.stats.no_events_count += 1
281-
self.log_stats_if_required()
282-
continue
283-
assert event.db_name == self.database
284-
if self.database != self.target_database:
285-
event.db_name = self.target_database
286-
self.handle_event(event)
287-
288-
logger.info('stopping db_replicator')
289-
self.upload_records()
290-
self.save_state_if_required(force=True)
291-
logger.info('stopped')
292-
293-
294-
def handle_event(self, event: LogEvent):
295-
if self.state.last_processed_transaction_non_uploaded is not None:
296-
if event.transaction_id <= self.state.last_processed_transaction_non_uploaded:
297-
return
298-
299-
logger.debug(f'processing event {event.transaction_id}, {event.event_type}, {event.table_name}')
300-
301-
event_handlers = {
302-
EventType.ADD_EVENT.value: self.handle_insert_event,
303-
EventType.REMOVE_EVENT.value: self.handle_erase_event,
304-
EventType.QUERY.value: self.handle_query_event,
305-
}
306-
307-
if not event.table_name or self.config.is_table_matches(event.table_name):
308-
event_handlers[event.event_type](event)
309-
310-
self.stats.events_count += 1
311-
self.stats.last_transaction = event.transaction_id
312-
self.state.last_processed_transaction_non_uploaded = event.transaction_id
313-
314-
self.upload_records_if_required(table_name=event.table_name)
315-
316-
self.save_state_if_required()
317-
self.log_stats_if_required()
318-
319-
def save_state_if_required(self, force=False):
320-
curr_time = time.time()
321-
if curr_time - self.last_save_state_time < DbReplicator.SAVE_STATE_INTERVAL and not force:
322-
return
323-
self.last_save_state_time = curr_time
324-
self.state.tables_last_record_version = self.clickhouse_api.tables_last_record_version
325-
self.state.save()
326-
327-
def _get_record_id(self, ch_table_structure, record: list):
328-
result = []
329-
for idx in ch_table_structure.primary_key_ids:
330-
field_type = ch_table_structure.fields[idx].field_type
331-
if field_type == 'String':
332-
result.append(f"'{record[idx]}'")
333-
else:
334-
result.append(record[idx])
335-
return ','.join(map(str, result))
336-
337-
def handle_insert_event(self, event: LogEvent):
338-
if self.config.debug_log_level:
339-
logger.debug(
340-
f'processing insert event: {event.transaction_id}, '
341-
f'table: {event.table_name}, '
342-
f'records: {event.records}',
343-
)
344-
self.stats.insert_events_count += 1
345-
self.stats.insert_records_count += len(event.records)
346-
347-
mysql_table_structure = self.state.tables_structure[event.table_name][0]
348-
clickhouse_table_structure = self.state.tables_structure[event.table_name][1]
349-
records = self.converter.convert_records(event.records, mysql_table_structure, clickhouse_table_structure)
350-
351-
current_table_records_to_insert = self.records_to_insert[event.table_name]
352-
current_table_records_to_delete = self.records_to_delete[event.table_name]
353-
for record in records:
354-
record_id = self._get_record_id(clickhouse_table_structure, record)
355-
current_table_records_to_insert[record_id] = record
356-
current_table_records_to_delete.discard(record_id)
357-
358-
def handle_erase_event(self, event: LogEvent):
359-
if self.config.debug_log_level:
360-
logger.debug(
361-
f'processing erase event: {event.transaction_id}, '
362-
f'table: {event.table_name}, '
363-
f'records: {event.records}',
364-
)
365-
self.stats.erase_events_count += 1
366-
self.stats.erase_records_count += len(event.records)
367-
368-
table_structure_ch: TableStructure = self.state.tables_structure[event.table_name][1]
369-
table_structure_mysql: TableStructure = self.state.tables_structure[event.table_name][0]
370-
371-
records = self.converter.convert_records(
372-
event.records, table_structure_mysql, table_structure_ch, only_primary=True,
373-
)
374-
keys_to_remove = [self._get_record_id(table_structure_ch, record) for record in records]
375-
376-
current_table_records_to_insert = self.records_to_insert[event.table_name]
377-
current_table_records_to_delete = self.records_to_delete[event.table_name]
378-
for record_id in keys_to_remove:
379-
current_table_records_to_delete.add(record_id)
380-
current_table_records_to_insert.pop(record_id, None)
381-
382-
def handle_query_event(self, event: LogEvent):
383-
if self.config.debug_log_level:
384-
logger.debug(f'processing query event: {event.transaction_id}, query: {event.records}')
385-
query = strip_sql_comments(event.records)
386-
if query.lower().startswith('alter'):
387-
self.upload_records()
388-
self.handle_alter_query(query, event.db_name)
389-
if query.lower().startswith('create table'):
390-
self.handle_create_table_query(query, event.db_name)
391-
if query.lower().startswith('drop table'):
392-
self.upload_records()
393-
self.handle_drop_table_query(query, event.db_name)
394-
if query.lower().startswith('rename table'):
395-
self.upload_records()
396-
self.handle_rename_table_query(query, event.db_name)
397-
398-
def handle_alter_query(self, query, db_name):
399-
self.converter.convert_alter_query(query, db_name)
400-
401-
def handle_create_table_query(self, query, db_name):
402-
mysql_structure, ch_structure = self.converter.parse_create_table_query(query)
403-
if not self.config.is_table_matches(mysql_structure.table_name):
404-
return
405-
self.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
406-
indexes = self.config.get_indexes(self.database, ch_structure.table_name)
407-
self.clickhouse_api.create_table(ch_structure, additional_indexes=indexes)
408-
409-
def handle_drop_table_query(self, query, db_name):
410-
tokens = query.split()
411-
if tokens[0].lower() != 'drop' or tokens[1].lower() != 'table':
412-
raise Exception('wrong drop table query', query)
413-
414-
if_exists = (len(tokens) > 4 and
415-
tokens[2].lower() == 'if' and
416-
tokens[3].lower() == 'exists')
417-
if if_exists:
418-
del tokens[2:4] # Remove the 'IF', 'EXISTS' tokens
419-
420-
if len(tokens) != 3:
421-
raise Exception('wrong token count', query)
422-
423-
db_name, table_name, matches_config = self.converter.get_db_and_table_name(tokens[2], db_name)
424-
if not matches_config:
425-
return
426-
427-
if table_name in self.state.tables_structure:
428-
self.state.tables_structure.pop(table_name)
429-
self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} `{db_name}`.`{table_name}`')
430-
431-
def handle_rename_table_query(self, query, db_name):
432-
tokens = query.split()
433-
if tokens[0].lower() != 'rename' or tokens[1].lower() != 'table':
434-
raise Exception('wrong rename table query', query)
435-
436-
ch_clauses = []
437-
for rename_clause in ' '.join(tokens[2:]).split(','):
438-
tokens = rename_clause.split()
439-
440-
if len(tokens) != 3:
441-
raise Exception('wrong token count', query)
442-
if tokens[1].lower() != 'to':
443-
raise Exception('"to" keyword expected', query)
444-
445-
src_db_name, src_table_name, matches_config = self.converter.get_db_and_table_name(tokens[0], db_name)
446-
dest_db_name, dest_table_name, _ = self.converter.get_db_and_table_name(tokens[2], db_name)
447-
if not matches_config:
448-
return
449-
450-
if src_db_name != self.target_database or dest_db_name != self.target_database:
451-
raise Exception('cross databases table renames not implemented', tokens)
452-
if src_table_name in self.state.tables_structure:
453-
self.state.tables_structure[dest_table_name] = self.state.tables_structure.pop(src_table_name)
454-
455-
ch_clauses.append(f"`{src_db_name}`.`{src_table_name}` TO `{dest_db_name}`.`{dest_table_name}`")
456-
self.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}')
457-
458-
def log_stats_if_required(self):
459-
curr_time = time.time()
460-
if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL:
461-
return
462-
463-
curr_process_time = time.process_time()
464-
465-
time_spent = curr_time - self.last_dump_stats_time
466-
process_time_spent = curr_process_time - self.last_dump_stats_process_time
467-
468-
if time_spent > 0.0:
469-
self.stats.cpu_load = process_time_spent / time_spent
470-
471-
self.last_dump_stats_time = curr_time
472-
self.last_dump_stats_process_time = curr_process_time
473-
logger.info(f'stats: {json.dumps(format_floats(self.stats.__dict__))}')
474-
logger.info(f'ch_stats: {json.dumps(format_floats(self.clickhouse_api.get_stats()))}')
475-
self.stats = Statistics()
476-
477-
def upload_records_if_required(self, table_name):
478-
need_dump = False
479-
if table_name is not None:
480-
if len(self.records_to_insert[table_name]) >= DbReplicator.DATA_DUMP_BATCH_SIZE:
481-
need_dump = True
482-
if len(self.records_to_delete[table_name]) >= DbReplicator.DATA_DUMP_BATCH_SIZE:
483-
need_dump = True
484-
485-
curr_time = time.time()
486-
if curr_time - self.last_records_upload_time >= DbReplicator.DATA_DUMP_INTERVAL:
487-
need_dump = True
488-
489-
if not need_dump:
490-
return
491-
492-
self.upload_records()
493-
494-
def upload_records(self):
495-
logger.debug(
496-
f'upload records, to insert: {len(self.records_to_insert)}, to delete: {len(self.records_to_delete)}',
497-
)
498-
self.last_records_upload_time = time.time()
499-
500-
for table_name, id_to_records in self.records_to_insert.items():
501-
records = id_to_records.values()
502-
if not records:
503-
continue
504-
_, ch_table_structure = self.state.tables_structure[table_name]
505-
if self.config.debug_log_level:
506-
logger.debug(f'inserting into {table_name}, records: {records}')
507-
self.clickhouse_api.insert(table_name, records, table_structure=ch_table_structure)
508-
509-
for table_name, keys_to_remove in self.records_to_delete.items():
510-
if not keys_to_remove:
511-
continue
512-
table_structure: TableStructure = self.state.tables_structure[table_name][0]
513-
primary_key_names = table_structure.primary_keys
514-
if self.config.debug_log_level:
515-
logger.debug(f'erasing from {table_name}, primary key: {primary_key_names}, values: {keys_to_remove}')
516-
self.clickhouse_api.erase(
517-
table_name=table_name,
518-
field_name=primary_key_names,
519-
field_values=keys_to_remove,
520-
)
521-
522-
self.records_to_insert = defaultdict(dict) # table_name => {record_id=>record, ...}
523-
self.records_to_delete = defaultdict(set) # table_name => {record_id, ...}
524-
self.state.last_processed_transaction = self.state.last_processed_transaction_non_uploaded
525-
self.save_state_if_required()
223+
# Delegate to the realtime replicator
224+
self.realtime_replicator.run_realtime_replication()

0 commit comments

Comments
 (0)