Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 80 additions & 87 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,24 @@
*/
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.*;
import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData;
import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.EventDataWrapper;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
Expand All @@ -34,7 +49,12 @@
import com.github.shyiko.mysql.binlog.network.protocol.Packet;
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
import com.github.shyiko.mysql.binlog.network.protocol.command.*;
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
Expand Down Expand Up @@ -115,14 +135,12 @@ public X509Certificate[] getAcceptedIssuers() {
private volatile long connectionId;
private SSLMode sslMode = SSLMode.DISABLED;

private GtidSet gtidSet;
private final Object gtidSetAccessLock = new Object();
protected GtidSet gtidSet;
protected final Object gtidSetAccessLock = new Object();
private boolean gtidSetFallbackToPurged;
private boolean useBinlogFilenamePositionInGtidMode;
private String gtid;
private boolean tx;
private boolean isMariadb = false;
private boolean mariadbSendAnnotateRowsEvent = false;

private EventDeserializer eventDeserializer = new EventDeserializer();

Expand All @@ -132,7 +150,7 @@ public X509Certificate[] getAcceptedIssuers() {
private SocketFactory socketFactory;
private SSLSocketFactory sslSocketFactory;

private volatile PacketChannel channel;
protected volatile PacketChannel channel;
private volatile boolean connected;
private volatile long masterServerId = -1;

Expand Down Expand Up @@ -317,15 +335,14 @@ public void setGtidSet(String gtidSet) {
this.binlogFilename = "";
}
synchronized (gtidSetAccessLock) {
// mariadb GtidSet format will be domainId-serverId-sequence
if (gtidSet != null && !gtidSet.contains(":")) {
this.gtidSet = new MariadbGtidSet(gtidSet);
} else {
this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;
}
this.gtidSet = gtidSet != null ? buildGtidSet(gtidSet) : null;
}
}

protected GtidSet buildGtidSet(String gtidSet) {
return new GtidSet(gtidSet);
}

/**
* @see #setGtidSetFallbackToPurged(boolean)
* @return whether gtid_purged is used as a fallback
Expand Down Expand Up @@ -488,19 +505,6 @@ public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

public boolean isMariadbSendAnnotateRowsEvent() {
return mariadbSendAnnotateRowsEvent;
}

/**
* Only in Mariadb, if set true, the Slave server connects with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2)
* in the COM_BINLOG_DUMP Slave Registration phase
* @param mariadbSendAnnotateRowsEvent
*/
public void setMariadbSendAnnotateRowsEvent(boolean mariadbSendAnnotateRowsEvent) {
this.mariadbSendAnnotateRowsEvent = mariadbSendAnnotateRowsEvent;
}

/**
* Connect to the replication stream. Note that this method blocks until disconnected.
* @throws AuthenticationException if authentication fails
Expand Down Expand Up @@ -538,17 +542,8 @@ public void connect() throws IOException, IllegalStateException {
channel.authenticationComplete();

connectionId = greetingPacket.getThreadId();
isMariadb = greetingPacket.getServerVersion().toLowerCase().contains("mariadb");
if ("".equals(binlogFilename)) {
synchronized (gtidSetAccessLock) {
if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
if (isMariadb) {
gtidSet = new MariadbGtidSet(fetchGtidPurged());
} else {
gtidSet = new GtidSet(fetchGtidPurged());
}
}
}
setupGtidSet();
}
if (binlogFilename == null) {
fetchBinlogFilenameAndPosition();
Expand Down Expand Up @@ -597,13 +592,7 @@ public void connect() throws IOException, IllegalStateException {
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
if (isMariadb) {
ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class);
}
ensureGtidEventDataDeserializer();
}
}
listenForEventPackets();
Expand Down Expand Up @@ -676,7 +665,7 @@ public Object call() throws Exception {
};
}

private void checkError(byte[] packet) throws IOException {
protected void checkError(byte[] packet) throws IOException {
if (packet[0] == (byte) 0xFF /* error */) {
byte[] bytes = Arrays.copyOfRange(packet, 1, packet.length);
ErrorPacket errorPacket = new ErrorPacket(bytes);
Expand Down Expand Up @@ -720,7 +709,6 @@ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOExceptio
return false;
}


private void enableHeartbeat() throws IOException {
channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000));
byte[] statementResult = channel.read();
Expand All @@ -735,35 +723,23 @@ private void setMasterServerId() throws IOException {
}
}

private void requestBinaryLogStream() throws IOException {
protected void requestBinaryLogStream() throws IOException {
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
if (isMariadb) {
channel.write(new QueryCommand("SET @mariadb_slave_capability=4"));
checkError(channel.read());
channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'"));
checkError(channel.read());
channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0"));
checkError(channel.read());
channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0"));
checkError(channel.read());
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isMariadbSendAnnotateRowsEvent());
} else {
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
gtidSet);
}
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
gtidSet);
} else {
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
}
}
channel.write(dumpBinaryLogCommand);
}

private void ensureEventDataDeserializer(EventType eventType,
protected void ensureEventDataDeserializer(EventType eventType,
Class<? extends EventDataDeserializer> eventDataDeserializerClass) {
EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType);
if (eventDataDeserializer.getClass() != eventDataDeserializerClass &&
Expand All @@ -780,6 +756,10 @@ private void ensureEventDataDeserializer(EventType eventType,
}
}

protected void ensureGtidEventDataDeserializer() {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
}

private void spawnKeepAliveThread() {
final ExecutorService threadExecutor =
Expand Down Expand Up @@ -924,6 +904,14 @@ private String fetchGtidPurged() throws IOException {
return "";
}

protected void setupGtidSet() throws IOException{
synchronized (gtidSetAccessLock) {
if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
gtidSet = new GtidSet(fetchGtidPurged());
}
}
}

private void fetchBinlogFilenameAndPosition() throws IOException {
ResultSetRowPacket[] resultSet;
channel.write(new QueryCommand("show master status"));
Expand Down Expand Up @@ -1025,7 +1013,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac
return result;
}

private void updateClientBinlogFilenameAndPosition(Event event) {
protected void updateClientBinlogFilenameAndPosition(Event event) {
EventHeader eventHeader = event.getHeader();
EventType eventType = eventHeader.getEventType();
if (eventType == EventType.ROTATE) {
Expand All @@ -1044,7 +1032,7 @@ private void updateClientBinlogFilenameAndPosition(Event event) {
}
}

private void updateGtidSet(Event event) {
protected void updateGtidSet(Event event) {
synchronized (gtidSetAccessLock) {
if (gtidSet == null) {
return;
Expand All @@ -1070,34 +1058,39 @@ private void updateGtidSet(Event event) {
tx = false;
break;
case QUERY:
case ANNOTATE_ROWS:
String sql;
if (eventHeader.getEventType() == EventType.QUERY) {
QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData());
sql = queryEventData.getSql();
} else {
AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDataWrapper.internal(event.getData());
sql = annotateRowsEventData.getRowsQuery();
}

QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData());
String sql = queryEventData.getSql();
if (sql == null) {
break;
}
if ("BEGIN".equals(sql)) {
tx = true;
} else
if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
commitGtid();
tx = false;
} else
if (!tx) {
// auto-commit query, likely DDL
commitGtid();
commitGtid(sql);
break;
case ANNOTATE_ROWS:
AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDeserializer.EventDataWrapper.internal(event.getData());
sql = annotateRowsEventData.getRowsQuery();
if (sql == null) {
break;
}
commitGtid(sql);
break;
default:
}
}

protected void commitGtid(String sql) {
if ("BEGIN".equals(sql)) {
tx = true;
} else
if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
commitGtid();
tx = false;
} else
if (!tx) {
// auto-commit query, likely DDL
commitGtid();
}
}

private void commitGtid() {
if (gtid != null) {
synchronized (gtidSetAccessLock) {
Expand Down Expand Up @@ -1308,7 +1301,7 @@ public interface LifecycleListener {
/**
* Default (no-op) implementation of {@link LifecycleListener}.
*/
public static abstract class AbstractLifecycleListener implements LifecycleListener {
public static abstract class AbstractLifecycleListener implements LifecycleListener {

public void onConnect(BinaryLogClient client) { }

Expand Down
Loading