diff --git a/.classpath b/.classpath
index b0e44f4..4b606b5 100644
--- a/.classpath
+++ b/.classpath
@@ -1,27 +1,12 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.project b/.project
index 2a5f0e6..5cdf2bd 100644
--- a/.project
+++ b/.project
@@ -1,23 +1,18 @@
-
-
- open-replicator
-
-
-
-
-
- org.eclipse.jdt.core.javabuilder
-
-
-
-
- org.eclipse.m2e.core.maven2Builder
-
-
-
-
-
- org.eclipse.jdt.core.javanature
- org.eclipse.m2e.core.maven2Nature
-
-
+
+
+ open-replicator
+ Open Replicator is a high performance MySQL binlog parser written in Java. NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+ org.eclipse.m2e.core.maven2Builder
+
+
+
+ org.eclipse.jdt.core.javanature
+ org.eclipse.m2e.core.maven2Nature
+
+
\ No newline at end of file
diff --git a/src/main/java/com/google/code/or/binlog/impl/FileBasedBinlogParser.java b/src/main/java/com/google/code/or/binlog/impl/FileBasedBinlogParser.java
index 8618193..95b0d4a 100644
--- a/src/main/java/com/google/code/or/binlog/impl/FileBasedBinlogParser.java
+++ b/src/main/java/com/google/code/or/binlog/impl/FileBasedBinlogParser.java
@@ -18,6 +18,8 @@
import java.io.File;
import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +47,7 @@ public class FileBasedBinlogParser extends AbstractBinlogParser {
protected String binlogFilePath;
protected long stopPosition = 0;
protected long startPosition = 4;
+ public static int availableLimit = 0;
/**
@@ -109,6 +112,7 @@ protected void doParse() throws Exception {
try {
//
final BinlogEventV4HeaderImpl header = new BinlogEventV4HeaderImpl();
+ ((XInputStreamImpl)is).startByteRecording();
header.setTimestamp(is.readLong(4) * 1000L);
header.setEventType(is.readInt(1));
header.setServerId(is.readLong(4));
@@ -117,6 +121,7 @@ protected void doParse() throws Exception {
header.setFlags(is.readInt(2));
header.setTimestampOfReceipt(System.currentTimeMillis());
is.setReadLimit((int)(header.getEventLength() - header.getHeaderLength())); // Ensure the event boundary
+
if(isVerbose() && LOGGER.isInfoEnabled()) {
LOGGER.info("read an event, header: {}", header);
}
@@ -133,6 +138,27 @@ protected void doParse() throws Exception {
BinlogEventParser parser = getEventParser(header.getEventType());
if(parser == null) parser = this.defaultParser;
parser.parse(is, header, context);
+
+ byte[] eventBytes = ((XInputStreamImpl)is).stopRecording();
+ if(is.available() == 4 && null != eventBytes)
+ {
+ is.setReadLimit(4);
+ byte[] checkSumBytes = is.readBytes(4);
+ Checksum checksumUtility = new CRC32();
+ checksumUtility.update(eventBytes, 0, eventBytes.length);
+ long checkSumValComputed = checksumUtility.getValue();
+ long checkSumValReceived = CodecUtils.toLong(CodecUtils.toBigEndian(checkSumBytes),0,checkSumBytes.length);
+ if(checkSumValComputed != checkSumValReceived)
+ {
+ throw new RuntimeException("Checksum did not match for event type: " + header.getEventType());
+ }
+ else
+ {
+ //Event size greater than normal is used to detect that file has checksum enabled
+ //This will be used for further event processing
+ availableLimit = 4;
+ }
+ }
}
// Ensure the packet boundary
diff --git a/src/main/java/com/google/code/or/binlog/impl/parser/DeleteRowsEventV2Parser.java b/src/main/java/com/google/code/or/binlog/impl/parser/DeleteRowsEventV2Parser.java
index b6f52d8..d6be509 100644
--- a/src/main/java/com/google/code/or/binlog/impl/parser/DeleteRowsEventV2Parser.java
+++ b/src/main/java/com/google/code/or/binlog/impl/parser/DeleteRowsEventV2Parser.java
@@ -22,6 +22,7 @@
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.BinlogParserContext;
+import com.google.code.or.binlog.impl.FileBasedBinlogParser;
import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.common.glossary.Row;
@@ -71,7 +72,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte
protected List parseRows(XInputStream is, TableMapEvent tme, DeleteRowsEventV2 dre)
throws IOException {
final List r = new LinkedList();
- while(is.available() > 0) {
+ while(is.available() > FileBasedBinlogParser.availableLimit) {
r.add(parseRow(is, tme, dre.getUsedColumns()));
}
return r;
diff --git a/src/main/java/com/google/code/or/binlog/impl/parser/UpdateRowsEventV2Parser.java b/src/main/java/com/google/code/or/binlog/impl/parser/UpdateRowsEventV2Parser.java
index 83f2642..29cf031 100644
--- a/src/main/java/com/google/code/or/binlog/impl/parser/UpdateRowsEventV2Parser.java
+++ b/src/main/java/com/google/code/or/binlog/impl/parser/UpdateRowsEventV2Parser.java
@@ -22,6 +22,7 @@
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.BinlogParserContext;
+import com.google.code.or.binlog.impl.FileBasedBinlogParser;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
import com.google.code.or.common.glossary.Pair;
@@ -73,7 +74,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte
protected List> parseRows(XInputStream is, TableMapEvent tme, UpdateRowsEventV2 ure)
throws IOException {
final List> r = new LinkedList>();
- while(is.available() > 0) {
+ while(is.available() > FileBasedBinlogParser.availableLimit) {
final Row before = parseRow(is, tme, ure.getUsedColumnsBefore());
final Row after = parseRow(is, tme, ure.getUsedColumnsAfter());
r.add(new Pair(before, after));
diff --git a/src/main/java/com/google/code/or/binlog/impl/parser/WriteRowsEventV2Parser.java b/src/main/java/com/google/code/or/binlog/impl/parser/WriteRowsEventV2Parser.java
index ffdd119..26ee173 100644
--- a/src/main/java/com/google/code/or/binlog/impl/parser/WriteRowsEventV2Parser.java
+++ b/src/main/java/com/google/code/or/binlog/impl/parser/WriteRowsEventV2Parser.java
@@ -22,6 +22,7 @@
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.BinlogParserContext;
+import com.google.code.or.binlog.impl.FileBasedBinlogParser;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEventV2;
import com.google.code.or.common.glossary.Row;
@@ -71,7 +72,7 @@ public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserConte
protected List parseRows(XInputStream is, TableMapEvent tme, WriteRowsEventV2 wre)
throws IOException {
final List r = new LinkedList();
- while(is.available() > 0) {
+ while(is.available() > FileBasedBinlogParser.availableLimit) {
r.add(parseRow(is, tme, wre.getUsedColumns()));
}
return r;
diff --git a/src/main/java/com/google/code/or/common/util/CodecUtils.java b/src/main/java/com/google/code/or/common/util/CodecUtils.java
index 3ecf246..233eaa7 100644
--- a/src/main/java/com/google/code/or/common/util/CodecUtils.java
+++ b/src/main/java/com/google/code/or/common/util/CodecUtils.java
@@ -21,6 +21,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
+import java.util.List;
/**
*
@@ -87,6 +88,17 @@ public static byte[] toByteArray(long num) {
return r;
}
+ public static byte[] toByteArray(List in)
+ {
+ final int n = in.size();
+ byte ret[] = new byte[n];
+ for (int i = 0; i < n; i++)
+ {
+ ret[i] = in.get(i);
+ }
+ return ret;
+ }
+
/**
*
*/
diff --git a/src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java b/src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java
index 6759886..d71b276 100644
--- a/src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java
+++ b/src/main/java/com/google/code/or/io/impl/XInputStreamImpl.java
@@ -19,6 +19,8 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
import com.google.code.or.common.glossary.UnsignedLong;
import com.google.code.or.common.glossary.column.BitColumn;
@@ -40,6 +42,7 @@ public class XInputStreamImpl extends InputStream implements XInputStream {
private int readLimit = 0;
private final byte[] buffer;
private final InputStream is;
+ private List recordBytes;
/**
@@ -47,11 +50,25 @@ public class XInputStreamImpl extends InputStream implements XInputStream {
*/
public XInputStreamImpl(InputStream is) {
this(is, 512 * 1024);
+ recordBytes = new ArrayList();
}
public XInputStreamImpl(InputStream is, int size) {
this.is = is;
this.buffer = new byte[size];
+ recordBytes = new ArrayList();
+ }
+
+
+ public void startByteRecording()
+ {
+ recordBytes = new ArrayList();
+ }
+
+ public byte[] stopRecording()
+ {
+ byte[] b = CodecUtils.toByteArray(recordBytes);
+ return b;
}
/**
@@ -68,6 +85,8 @@ public long readLong(int length) throws IOException {
public byte[] readBytes(int length) throws IOException {
final byte[] r = new byte[length];
this.read(r, 0, length);
+ for(int i=0;i= this.tail) doFill();
+ recordBytes.add(new Byte(this.buffer[this.head]));
final int r = this.buffer[this.head++] & 0xFF;
++this.readCount;
return r;
diff --git a/src/test/java/com/google/code/or/OpenParserTest.java b/src/test/java/com/google/code/or/OpenParserTest.java
index 430c26e..bcc1946 100644
--- a/src/test/java/com/google/code/or/OpenParserTest.java
+++ b/src/test/java/com/google/code/or/OpenParserTest.java
@@ -22,8 +22,8 @@ public static void main(String args[]) throws Exception {
//
final OpenParser op = new OpenParser();
op.setStartPosition(4);
- op.setBinlogFileName("mysql_bin.000031");
- op.setBinlogFilePath("C:/Documents and Settings/All Users/Application Data/MySQL/MySQL Server 5.5/data");
+ op.setBinlogFileName("coresa-log-bin.002767");
+ op.setBinlogFilePath("/home/amolk/");
op.setBinlogEventListener(new BinlogEventListener() {
public void onEvents(BinlogEventV4 event) {
if(event instanceof XidEvent) {