Skip to content

Commit 0417068

Browse files
committed
#29 - Ability to deserialize TABLE_MAP and ROTATE events to byte arrays
1 parent 024ade9 commit 0417068

File tree

3 files changed

+176
-20
lines changed

3 files changed

+176
-20
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
package com.github.shyiko.mysql.binlog;
1717

1818
import com.github.shyiko.mysql.binlog.event.Event;
19+
import com.github.shyiko.mysql.binlog.event.EventData;
1920
import com.github.shyiko.mysql.binlog.event.EventHeader;
2021
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
2122
import com.github.shyiko.mysql.binlog.event.EventType;
2223
import com.github.shyiko.mysql.binlog.event.RotateEventData;
2324
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
2425
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
26+
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
2527
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
28+
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
2629
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2730
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
2831
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
@@ -147,7 +150,7 @@ public long getServerId() {
147150
}
148151

149152
/**
150-
* @param serverId server id (in the range from 1 to 2^32 1). This value MUST be unique across whole replication
153+
* @param serverId server id (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
151154
* group (that is, different from any other server id being used by any master or slave). Keep in mind that each
152155
* binary log client (mysql-binlog-connector-java/BinaryLogClient, mysqlbinlog, etc) should be treated as a
153156
* simplified slave and thus MUST also use a different server id.
@@ -325,6 +328,13 @@ public void connect() throws IOException {
325328
if (keepAlive && !isKeepAliveThreadRunning()) {
326329
spawnKeepAliveThread();
327330
}
331+
EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(EventType.ROTATE);
332+
if (eventDataDeserializer.getClass() != RotateEventDataDeserializer.class &&
333+
eventDataDeserializer.getClass() != EventDeserializer.EventDataWrapper.Deserializer.class) {
334+
eventDeserializer.setEventDataDeserializer(EventType.ROTATE,
335+
new EventDeserializer.EventDataWrapper.Deserializer(new RotateEventDataDeserializer(),
336+
eventDataDeserializer));
337+
}
328338
listenForEventPackets();
329339
}
330340

@@ -536,11 +546,15 @@ private void listenForEventPackets() throws IOException {
536546
private void updateClientBinlogFilenameAndPosition(Event event) {
537547
EventHeader eventHeader = event.getHeader();
538548
if (eventHeader.getEventType() == EventType.ROTATE) {
539-
RotateEventData eventData = event.getData();
540-
if (eventData != null) {
541-
binlogFilename = eventData.getBinlogFilename();
542-
binlogPosition = eventData.getBinlogPosition();
549+
EventData eventData = event.getData();
550+
RotateEventData rotateEventData;
551+
if (eventData instanceof EventDeserializer.EventDataWrapper) {
552+
rotateEventData = (RotateEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
553+
} else {
554+
rotateEventData = (RotateEventData) eventData;
543555
}
556+
binlogFilename = rotateEventData.getBinlogFilename();
557+
binlogPosition = rotateEventData.getBinlogPosition();
544558
} else
545559
if (eventHeader instanceof EventHeaderV4) {
546560
EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
@@ -602,6 +616,9 @@ public void unregisterEventListener(EventListener eventListener) {
602616
}
603617

604618
private void notifyEventListeners(Event event) {
619+
if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
620+
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal());
621+
}
605622
synchronized (eventListeners) {
606623
for (EventListener eventListener : eventListeners) {
607624
try {

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

Lines changed: 110 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.io.IOException;
2626
import java.util.HashMap;
27+
import java.util.IdentityHashMap;
2728
import java.util.Map;
2829

2930
/**
@@ -39,6 +40,8 @@ public class EventDeserializer {
3940

4041
private final Map<Long, TableMapEventData> tableMapEventByTableId;
4142

43+
private EventDataDeserializer tableMapEventDataDeserializer;
44+
4245
public EventDeserializer() {
4346
this(new EventHeaderV4Deserializer(), new NullEventDataDeserializer());
4447
}
@@ -55,9 +58,12 @@ public EventDeserializer(
5558
EventHeaderDeserializer eventHeaderDeserializer,
5659
EventDataDeserializer defaultEventDataDeserializer
5760
) {
58-
this(eventHeaderDeserializer, defaultEventDataDeserializer,
59-
new HashMap<EventType, EventDataDeserializer>(), new HashMap<Long, TableMapEventData>());
61+
this.eventHeaderDeserializer = eventHeaderDeserializer;
62+
this.defaultEventDataDeserializer = defaultEventDataDeserializer;
63+
this.eventDataDeserializers = new IdentityHashMap<EventType, EventDataDeserializer>();
64+
this.tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
6065
registerDefaultEventDataDeserializers();
66+
afterEventDataDeserializerSet(null);
6167
}
6268

6369
public EventDeserializer(
@@ -70,14 +76,20 @@ public EventDeserializer(
7076
this.defaultEventDataDeserializer = defaultEventDataDeserializer;
7177
this.eventDataDeserializers = eventDataDeserializers;
7278
this.tableMapEventByTableId = tableMapEventByTableId;
79+
afterEventDataDeserializerSet(null);
7380
}
7481

7582
private void registerDefaultEventDataDeserializers() {
76-
eventDataDeserializers.put(EventType.FORMAT_DESCRIPTION, new FormatDescriptionEventDataDeserializer());
77-
eventDataDeserializers.put(EventType.ROTATE, new RotateEventDataDeserializer());
78-
eventDataDeserializers.put(EventType.QUERY, new QueryEventDataDeserializer());
79-
eventDataDeserializers.put(EventType.TABLE_MAP, new TableMapEventDataDeserializer());
80-
eventDataDeserializers.put(EventType.XID, new XidEventDataDeserializer());
83+
eventDataDeserializers.put(EventType.FORMAT_DESCRIPTION,
84+
new FormatDescriptionEventDataDeserializer());
85+
eventDataDeserializers.put(EventType.ROTATE,
86+
new RotateEventDataDeserializer());
87+
eventDataDeserializers.put(EventType.QUERY,
88+
new QueryEventDataDeserializer());
89+
eventDataDeserializers.put(EventType.TABLE_MAP,
90+
new TableMapEventDataDeserializer());
91+
eventDataDeserializers.put(EventType.XID,
92+
new XidEventDataDeserializer());
8193
eventDataDeserializers.put(EventType.WRITE_ROWS,
8294
new WriteRowsEventDataDeserializer(tableMapEventByTableId));
8395
eventDataDeserializers.put(EventType.UPDATE_ROWS,
@@ -93,12 +105,28 @@ private void registerDefaultEventDataDeserializers() {
93105
eventDataDeserializers.put(EventType.EXT_DELETE_ROWS,
94106
new DeleteRowsEventDataDeserializer(tableMapEventByTableId).
95107
setMayContainExtraInformation(true));
96-
eventDataDeserializers.put(EventType.ROWS_QUERY, new RowsQueryEventDataDeserializer());
97-
eventDataDeserializers.put(EventType.GTID, new GtidEventDataDeserializer());
108+
eventDataDeserializers.put(EventType.ROWS_QUERY,
109+
new RowsQueryEventDataDeserializer());
110+
eventDataDeserializers.put(EventType.GTID,
111+
new GtidEventDataDeserializer());
98112
}
99113

100114
public void setEventDataDeserializer(EventType eventType, EventDataDeserializer eventDataDeserializer) {
101115
eventDataDeserializers.put(eventType, eventDataDeserializer);
116+
afterEventDataDeserializerSet(eventType);
117+
}
118+
119+
private void afterEventDataDeserializerSet(EventType eventType) {
120+
if (eventType == null || eventType == EventType.TABLE_MAP) {
121+
EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(EventType.TABLE_MAP);
122+
if (eventDataDeserializer.getClass() != TableMapEventDataDeserializer.class &&
123+
eventDataDeserializer.getClass() != EventDataWrapper.Deserializer.class) {
124+
tableMapEventDataDeserializer = new EventDataWrapper.Deserializer(
125+
new TableMapEventDataDeserializer(), eventDataDeserializer);
126+
} else {
127+
tableMapEventDataDeserializer = null;
128+
}
129+
}
102130
}
103131

104132
public void setChecksumType(ChecksumType checksumType) {
@@ -113,17 +141,29 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
113141
return null;
114142
}
115143
EventHeader eventHeader = eventHeaderDeserializer.deserialize(inputStream);
116-
EventData eventData = deserializeEventData(inputStream, eventHeader);
144+
EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());
145+
if (eventHeader.getEventType() == EventType.TABLE_MAP && tableMapEventDataDeserializer != null) {
146+
eventDataDeserializer = tableMapEventDataDeserializer;
147+
}
148+
EventData eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);
117149
if (eventHeader.getEventType() == EventType.TABLE_MAP) {
118-
TableMapEventData tableMapEvent = (TableMapEventData) eventData;
150+
TableMapEventData tableMapEvent;
151+
if (eventData instanceof EventDataWrapper) {
152+
EventDataWrapper eventDataWrapper = (EventDataWrapper) eventData;
153+
tableMapEvent = (TableMapEventData) eventDataWrapper.getInternal();
154+
if (tableMapEventDataDeserializer != null) {
155+
eventData = eventDataWrapper.getExternal();
156+
}
157+
} else {
158+
tableMapEvent = (TableMapEventData) eventData;
159+
}
119160
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
120161
}
121162
return new Event(eventHeader, eventData);
122163
}
123164

124-
private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHeader eventHeader)
125-
throws EventDataDeserializationException {
126-
EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());
165+
private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHeader eventHeader,
166+
EventDataDeserializer eventDataDeserializer) throws EventDataDeserializationException {
127167
// todo: use checksum algorithm descriptor from FormatDescriptionEvent
128168
// (as per http://dev.mysql.com/worklog/task/?id=2540)
129169
int eventBodyLength = (int) eventHeader.getDataLength() - checksumLength;
@@ -142,9 +182,64 @@ private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHe
142182
return eventData;
143183
}
144184

145-
private EventDataDeserializer getEventDataDeserializer(EventType eventType) {
185+
public EventDataDeserializer getEventDataDeserializer(EventType eventType) {
146186
EventDataDeserializer eventDataDeserializer = eventDataDeserializers.get(eventType);
147187
return eventDataDeserializer != null ? eventDataDeserializer : defaultEventDataDeserializer;
148188
}
149189

190+
/**
191+
* Enwraps internal {@link EventData} if custom {@link EventDataDeserializer} is provided (for internally used
192+
* events only).
193+
*/
194+
public static class EventDataWrapper implements EventData {
195+
196+
private EventData internal;
197+
private EventData external;
198+
199+
public EventDataWrapper(EventData internal, EventData external) {
200+
this.internal = internal;
201+
this.external = external;
202+
}
203+
204+
public EventData getInternal() {
205+
return internal;
206+
}
207+
208+
public EventData getExternal() {
209+
return external;
210+
}
211+
212+
@Override
213+
public String toString() {
214+
final StringBuilder sb = new StringBuilder("InternalEventData");
215+
sb.append("{internal=").append(internal);
216+
sb.append(", external=").append(external);
217+
sb.append('}');
218+
return sb.toString();
219+
}
220+
221+
/**
222+
* {@link com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.EventDataWrapper} deserializer.
223+
*/
224+
public static class Deserializer implements EventDataDeserializer {
225+
226+
private EventDataDeserializer internal;
227+
private EventDataDeserializer external;
228+
229+
public Deserializer(EventDataDeserializer internal, EventDataDeserializer external) {
230+
this.internal = internal;
231+
this.external = external;
232+
}
233+
234+
@Override
235+
public EventData deserialize(ByteArrayInputStream inputStream) throws IOException {
236+
byte[] bytes = inputStream.read(inputStream.available());
237+
EventData internalEventData = internal.deserialize(new ByteArrayInputStream(bytes));
238+
EventData externalEventData = external.deserialize(new ByteArrayInputStream(bytes));
239+
return new EventDataWrapper(internalEventData, externalEventData);
240+
}
241+
}
242+
243+
}
244+
150245
}

src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.github.shyiko.mysql.binlog;
1717

18+
import com.github.shyiko.mysql.binlog.event.ByteArrayEventData;
1819
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
1920
import com.github.shyiko.mysql.binlog.event.Event;
2021
import com.github.shyiko.mysql.binlog.event.EventData;
@@ -23,6 +24,7 @@
2324
import com.github.shyiko.mysql.binlog.event.QueryEventData;
2425
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
2526
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
27+
import com.github.shyiko.mysql.binlog.event.deserialization.ByteArrayEventDataDeserializer;
2628
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
2729
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
2830
import com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer;
@@ -541,6 +543,48 @@ public void execute(Statement statement) throws SQLException {
541543
}
542544
}
543545

546+
@Test
547+
public void testCustomEventDataDeserializers() throws Exception {
548+
try {
549+
client.disconnect();
550+
final BinaryLogClient binaryLogClient = new BinaryLogClient(slave.hostname, slave.port,
551+
slave.username, slave.password);
552+
binaryLogClient.registerEventListener(new TraceEventListener());
553+
binaryLogClient.registerEventListener(eventListener);
554+
EventDeserializer deserializer = new EventDeserializer();
555+
deserializer.setEventDataDeserializer(EventType.QUERY, new ByteArrayEventDataDeserializer());
556+
// TABLE_MAP and ROTATE events are both used internally, but that doesn't mean it shouldn't be possible to
557+
// specify different EventDataDeserializer|s
558+
deserializer.setEventDataDeserializer(EventType.TABLE_MAP, new ByteArrayEventDataDeserializer());
559+
deserializer.setEventDataDeserializer(EventType.ROTATE, new ByteArrayEventDataDeserializer());
560+
binaryLogClient.setEventDeserializer(deserializer);
561+
try {
562+
eventListener.reset();
563+
binaryLogClient.connect(DEFAULT_TIMEOUT);
564+
eventListener.waitFor(EventType.FORMAT_DESCRIPTION, 1, DEFAULT_TIMEOUT);
565+
master.execute(new Callback<Statement>() {
566+
@Override
567+
public void execute(Statement statement) throws SQLException {
568+
statement.execute("insert into bikini_bottom values('SpongeBob')");
569+
}
570+
});
571+
slave.execute(new Callback<Statement>() {
572+
@Override
573+
public void execute(Statement statement) throws SQLException {
574+
statement.execute("flush logs");
575+
}
576+
});
577+
eventListener.waitFor(EventType.QUERY, 1, DEFAULT_TIMEOUT);
578+
eventListener.waitFor(EventType.ROTATE, 3, DEFAULT_TIMEOUT); /* 2 with timestamp 0 */
579+
eventListener.waitFor(ByteArrayEventData.class, 5, DEFAULT_TIMEOUT);
580+
} finally {
581+
binaryLogClient.disconnect();
582+
}
583+
} finally {
584+
client.connect(DEFAULT_TIMEOUT);
585+
}
586+
}
587+
544588
@Test(expectedExceptions = IllegalStateException.class)
545589
public void testExceptionIsThrownWhenTryingToConnectAlreadyConnectedClient() throws Exception {
546590
assertTrue(client.isConnected());

0 commit comments

Comments
 (0)