diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index 971ec55..2f89293 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -12,7 +12,7 @@ class Binlog2sql(object): - def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None, + def __init__(self, connection_settings, thread_id=0, start_file=None, start_pos=None, end_file=None, end_pos=None, start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False, flashback=False, stop_never=False, back_interval=1.0, only_dml=True, sql_type=None): """ @@ -23,6 +23,7 @@ def __init__(self, connection_settings, start_file=None, start_pos=None, end_fil raise ValueError('Lack of parameter: start_file') self.conn_setting = connection_settings + self.thread_id = thread_id self.start_file = start_file self.start_pos = start_pos if start_pos else 4 # use binlog v4 self.end_file = end_file if end_file else start_file @@ -98,13 +99,15 @@ def process_binlog(self): if isinstance(binlog_event, QueryEvent) and not self.only_dml: sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, - flashback=self.flashback, no_pk=self.no_pk) + flashback=self.flashback, no_pk=self.no_pk, + thread_id=self.thread_id) if sql: print(sql) elif is_dml_event(binlog_event) and event_type(binlog_event) in self.sql_type: for row in binlog_event.rows: sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk, - row=row, flashback=self.flashback, e_start_pos=e_start_pos) + row=row, flashback=self.flashback, e_start_pos=e_start_pos, + thread_id=self.thread_id) if self.flashback: f_tmp.write(sql + '\n') else: @@ -142,7 +145,8 @@ def __del__(self): if __name__ == '__main__': args = command_line_args(sys.argv[1:]) conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'} - binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos, + binlog2sql = Binlog2sql(connection_settings=conn_setting, thread_id=args.thread_id, + start_file=args.start_file, start_pos=args.start_pos, end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time, stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables, no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never, diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index 3a9bb31..03e9880 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -13,7 +13,7 @@ UpdateRowsEvent, DeleteRowsEvent, ) - +BEGIN_QUERY_EVENT = None if sys.version > '3': PY3PLUS = True @@ -65,6 +65,7 @@ def parse_args(): connect_setting.add_argument('-P', '--port', dest='port', type=int, help='MySQL port to use', default=3306) interval = parser.add_argument_group('interval filter') + interval.add_argument('--thread-id', dest='thread_id', type=int, help='binlog event thread_id', default=0) interval.add_argument('--start-file', dest='start_file', type=str, help='Start binlog file to be parsed') interval.add_argument('--start-position', '--start-pos', dest='start_pos', type=int, help='Start position of the --start-file', default=4) @@ -164,7 +165,8 @@ def event_type(event): return t -def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None, flashback=False, no_pk=False): +def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None, + flashback=False, no_pk=False, thread_id=0): if flashback and no_pk: raise ValueError('only one of flashback or no_pk can be True') if not (isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) @@ -172,12 +174,17 @@ def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=Non raise ValueError('binlog_event must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent') sql = '' + global BEGIN_QUERY_EVENT + if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN': + BEGIN_QUERY_EVENT = binlog_event if isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) \ or isinstance(binlog_event, DeleteRowsEvent): - pattern = generate_sql_pattern(binlog_event, row=row, flashback=flashback, no_pk=no_pk) - sql = cursor.mogrify(pattern['template'], pattern['values']) - time = datetime.datetime.fromtimestamp(binlog_event.timestamp) - sql += ' #start %s end %s time %s' % (e_start_pos, binlog_event.packet.log_pos, time) + # Support filter by the thread_id + if (thread_id == 0) or (thread_id and BEGIN_QUERY_EVENT.slave_proxy_id == thread_id): + pattern = generate_sql_pattern(binlog_event, row=row, flashback=flashback, no_pk=no_pk) + sql = cursor.mogrify(pattern['template'], pattern['values']) + time = datetime.datetime.fromtimestamp(binlog_event.timestamp) + sql += ' #start %s end %s time %s' % (e_start_pos, binlog_event.packet.log_pos, time) elif flashback is False and isinstance(binlog_event, QueryEvent) and binlog_event.query != 'BEGIN' \ and binlog_event.query != 'COMMIT': if binlog_event.schema: