Skip to content

Databus for MySQL

Chavdar Botev edited this page Dec 24, 2013 · 22 revisions

Introduction

A frequently question asked on the Databus open source mailing list is about the possibility of capturing changes in MySQL through Databus. Below, we describe a proof-of-concept implementation of a Databus fetcher that taps into MySQL internal replication stream, the MySQL binary log (the binlog, for short).

At a high-level, the Databus MySQL adapter connects as a replication slave to the MySQL database. It converts the replication events to Databus internal event format and stores those events in the relay’s memory buffer. For parsing the incoming binlog replication stream, the fetcher uses an open-source Java library called “OpenReplicator”. The library is available here.

Quick Start

  • You need a MySQL instance running with binlog replication enabled. Please refer to How to Set Up Replication guide on how to enable the binlog replication for the MySQL instance. That MySQL instance will act as a master for the Databus relay.
  • Compile the example

The code will shortly be available on github. It may be built with the following commands:

$ gradle -Dopen_source=true assemble

  • Run the MySQL example

$ cd build/databus2-relay-example-pkg/distributions

$ tar -zxvf databus2-example-relay-pkg.tar.gz

$ ./bin/create_person.sh : The script assumes that MySQL is started on port 33066; please change it appropriately for your setup. It creates a database called ‘or_test’, a table called ‘person’ within it, and it inserts 9 sample rows in that table.

$ ./bin/start-example-relay.sh or_person -Y ./conf/sources-or-person.json : This script starts a Databus relay and subscribes for change-capture for the table ‘or_test.person’.

  • Test if the relay has successfully been started

A quick way is to issue a curl command to the relay’s HTTP port as specified in conf/relay_or_person.properties (11115). The logical source id specified for the table ‘Person’ in conf/sources-or-person.json is 40:

$ curl -s http://localhost:11115/sources
[{"name":"com.linkedin.events.example.or_test.Person","id":40}]
  • Insert an event into the table with a command like:
update person set first_name='Balaji' where id=1;
  • Check if the relay has received the event from the database with the following command:
$ curl -s http://localhost:11115/containerStats/inbound/events/total?pretty | grep -m1 numDataEvents
"numDataEvents" : 1,

Notes on the implementation

The current implementation although fully functional should be considered a proof-of-concept. Its main goal is to demonstrate how to write a MySQL fetcher for Databus. The basic functionality of the fetcher has been tested but has not been deployed in our production environment. At LinkedIn, we run a slightly different implementation which relies on a custom-patched MySQL.

A quick question arises in the implementation of a MySQL adapter - how do we define the logical clock for the event sequence ?. The event sequence number (System Change Number or SCN for short) is used as a timeline for ordering events from various transactions occurring at the database. Please refer to Section 3.2 in our paper titled “All Aboard the Databus”. It is also used in the consumers’ checkpoints to determine where they are in the change stream.

There are several possible approaches for defining the logical clock.

The MySQL binlog offset

This is the simplest approach and it is the one used by this implementation. Each replication slave keeps track of its position in the replication stream through a pair of values: the current binlog file and the offset within that file. The binlog files share common prefix and have an increasing index number for their suffix. Therefore, the position can be uniquely identified by the tuple (file number, file offset). Both the file index and offset increase monotonically and thus the SCNs will be exactly in the commit order.

In the current implementation, SCN is represented as a long (64 bits). Of the 64 bits, the high-order 32 bits are used to represent the binlog file number, and the low-order 32 bits are used to represent the binlog file offset. Therefore, an event starting in binary log file mysql-bin.000001 at binlog offset 4 is represented as (1 << 32) | 4 = 4294967300 .

Advantages

  1. Simple to understand and easy to implement
  2. Works with Vanilla MySQL-5.5.x architecture
  3. Works well for non-clustered environments
  4. Maintains the commit order of the updates

Limitations

This approach inherits the typical limitations of the MySQL replication which stem from the fact that the binlog offset is not stable and may change.

  • Changes on the MySQL master node

The binlog files may change as a result of DBA commands like ‘reset master’. That command resets the binlog file/sequence number generation to initial values of mysql-bin.000001 and binlog offset 4, i.e. resets the logical clock. After such command, all downstream components (Databus relays, Databus bootstraps, Databus consumers) need to be reset to the new timeline. A simple workaround is to just use ‘purge logs’.

  • Changes across MySQL nodes due to clustered setups

A typical MySQL setup will have a master database and one or more slave databases. MySql replication does not guarantee that binlog file numbers and sequences match on the slave nodes with corresponding masters in the cluster. In such a case, if a Databus relay is connected to a slave storage node, the binlog co-ordinates of a transaction may be different than that on the master. A Databus client switching from a Databus relay connected to such a master node to a relay that is connected to the corresponding slave node will get no/inconsistent data due to the different clocks.

If the client has consumed from node n1 up through scn1. It would look for transactions greater than scn1 on the new node n2. But due to the nature of MySQL replication, the newly committed transaction for the database can actually be numerically smaller than scn1, which will cause the new transaction to be missed and thereby to affect consistency.

Even if all relays capture updates from a single MySQL node, we can run into the same problem with the relays need to fail-over to a different node.

The MySQL fetcher implementation has been designed for a single-node MySQL setup, and it has been tested in a single-database (with multiple tables) scenario. It has not yet been deployed in our production clusters. Further, we describe the limitations in the current design and would like to note that the design/implementation may change significantly going forward. Hence subsequent versions may not be backward-compatible.

An application-generated SCN

Another approach is for the application that writes to MySQL to generate the sequence number. For example, this can be achieved using an algorithm like Twitter’s Snowflake. The number can be written in a column which can be easily extracted from the relay.

Advantages

  • Relatively easy to implement
  • Algorithms like Snowflake allow for highly available and scalable generation of globally unique numbers

Limitations

  • Ordering is not guaranteed to be in commit order

Since the SCNs are determined before the updates are written to MySQL, their order may (and probably will) be different from the MySQL commit order. This may be OK for applications that do not rely on strong consistency.

A master-generated SCN

One way this can be achieved is through an {{auto_increment}} column.

A second way is to modify MySQL to do generate the number after the transaction commit.

A relay-generated SCN

Code Structure

The relevant code is available in databus2-relay/databus2-event-producer-or

  1. OpenReplicatorEventProducer.java Implements the fetcher for mysql
  2. ORListener.java Implements interface methods for processing binlog events
  3. OpenReplicatorAvroEventFactory.java Implements the logic for constructing a databus event in AVRO serialized format from the underlying binlog entry

Features

  1. Real-time change-capture from MySQL
  2. Binlog file rotation on MySQL
  3. Tested for single / multiple tables
  4. Tested for MySQL-5.5.35

Future Work

  1. Automatic Avro schema file generation for a given MySQL table
  2. Support for consistent change-capture in a clustered MySQL environment with mastership transfers
  3. Support for global TXID in MySQL-5.6
  4. Multi-tenancy optimizations (w.r.t. number of fetches of binlog files from master and server-side filtering)
  5. Composite keys

Clone this wiki locally