Skip to content

Commit ec02041

Browse files
committed
Added support for "non blocking" mode ("mysqlbinlog --to-last-log" equivalent)
Conflicts: src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
1 parent ae2623a commit ec02041

File tree

1 file changed

+22
-2
lines changed

1 file changed

+22
-2
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public class BinaryLogClient implements BinaryLogClientMXBean {
8585
private final String username;
8686
private final String password;
8787

88+
private boolean blocking = true;
8889
private long serverId = 65535;
8990
private volatile String binlogFilename;
9091
private volatile long binlogPosition = 4;
@@ -154,6 +155,17 @@ public BinaryLogClient(String hostname, int port, String schema, String username
154155
this.password = password;
155156
}
156157

158+
public boolean isBlocking() {
159+
return blocking;
160+
}
161+
162+
/**
163+
* @param blocking blocking mode. If set to false - BinaryLogClient will disconnect after the last event.
164+
*/
165+
public void setBlocking(boolean blocking) {
166+
this.blocking = blocking;
167+
}
168+
157169
/**
158170
* @return server id (65535 by default)
159171
* @see #setServerId(long)
@@ -369,8 +381,12 @@ public void connect() throws IOException {
369381
connected = true;
370382
connectionId = greetingPacket.getThreadId();
371383
if (logger.isLoggable(Level.INFO)) {
372-
logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition +
373-
" (sid:" + serverId + ", cid:" + connectionId + ")");
384+
String position;
385+
synchronized (gtidSetAccessLock) {
386+
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
387+
}
388+
logger.info("Connected to " + hostname + ":" + port + " at " + position +
389+
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
374390
}
375391
synchronized (lifecycleListeners) {
376392
for (LifecycleListener lifecycleListener : lifecycleListeners) {
@@ -401,6 +417,7 @@ private GreetingPacket receiveGreeting() throws IOException {
401417
}
402418

403419
private void requestBinaryLogStream() throws IOException {
420+
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
404421
Command dumpBinaryLogCommand;
405422
synchronized (gtidSetAccessLock) {
406423
if (gtidSet != null) {
@@ -603,6 +620,9 @@ private void listenForEventPackets() throws IOException {
603620
throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
604621
errorPacket.getSqlState());
605622
}
623+
if (marker == (byte) 0xFE && !blocking) {
624+
break;
625+
}
606626
Event event;
607627
try {
608628
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?

0 commit comments

Comments
 (0)