Skip to content

Commit c2a5f11

Browse files
debezium/dbz#1518 fix EventDeserializer composition
Signed-off-by: Adela Jaworowska-Nowak <[email protected]>
1 parent beff0e8 commit c2a5f11

File tree

3 files changed

+89
-2
lines changed

3 files changed

+89
-2
lines changed

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2727

2828
import java.io.IOException;
29+
import java.util.Arrays;
2930
import java.util.EnumSet;
30-
import java.util.HashMap;
3131
import java.util.IdentityHashMap;
3232
import java.util.Map;
3333

@@ -218,6 +218,16 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
218218
compatibilitySet.contains(CompatibilityMode.INTEGER_AS_BYTE_ARRAY)
219219
);
220220
}
221+
if (eventDataDeserializer instanceof TransactionPayloadEventDataDeserializer) {
222+
TransactionPayloadEventDataDeserializer deserializer =
223+
(TransactionPayloadEventDataDeserializer) eventDataDeserializer;
224+
if (!compatibilitySet.isEmpty()) {
225+
EventDeserializer.CompatibilityMode[] compatibilityModes = compatibilitySet.toArray(new EventDeserializer.CompatibilityMode[0]);
226+
EventDeserializer.CompatibilityMode first = compatibilityModes[0];
227+
EventDeserializer.CompatibilityMode[] rest = Arrays.copyOfRange(compatibilityModes, 1, compatibilityModes.length);
228+
deserializer.setCompatibilityMode(first, rest);
229+
}
230+
}
221231
}
222232

223233
/**

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
2525
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
import java.util.EnumSet;
2628

2729
/**
2830
* @author <a href="mailto:[email protected]">Somesh Malviya</a>
@@ -34,6 +36,12 @@ public class TransactionPayloadEventDataDeserializer implements EventDataDeseria
3436
public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2;
3537
public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3;
3638

39+
private EnumSet<EventDeserializer.CompatibilityMode> compatibilitySet = EnumSet.noneOf(EventDeserializer.CompatibilityMode.class);
40+
41+
public void setCompatibilityMode(EventDeserializer.CompatibilityMode first, EventDeserializer.CompatibilityMode... rest) {
42+
this.compatibilitySet = EnumSet.of(first, rest);
43+
}
44+
3745
@Override
3846
public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
3947
TransactionPayloadEventData eventData = new TransactionPayloadEventData();
@@ -87,6 +95,13 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)
8795
// Read and store events from decompressed byte array into input stream
8896
ArrayList<Event> decompressedEvents = new ArrayList<>();
8997
EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer();
98+
if (!compatibilitySet.isEmpty()) {
99+
EventDeserializer.CompatibilityMode[] compatibilityModes = compatibilitySet.toArray(new EventDeserializer.CompatibilityMode[0]);
100+
EventDeserializer.CompatibilityMode first = compatibilityModes[0];
101+
EventDeserializer.CompatibilityMode[] rest = Arrays.copyOfRange(compatibilityModes, 1, compatibilityModes.length);
102+
transactionPayloadEventDeserializer.setCompatibilityMode(first, rest);
103+
}
104+
90105
ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst);
91106

92107
Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);

src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,20 @@
1515
*/
1616
package com.github.shyiko.mysql.binlog.event.deserialization;
1717

18+
import com.github.shyiko.mysql.binlog.event.Event;
19+
import com.github.shyiko.mysql.binlog.event.EventHeader;
20+
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
1821
import com.github.shyiko.mysql.binlog.event.EventType;
1922
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
20-
import com.github.shyiko.mysql.binlog.event.XAPrepareEventData;
23+
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
2124
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2225
import org.testng.annotations.Test;
2326

2427
import java.io.IOException;
28+
import java.io.Serializable;
2529

2630
import static org.testng.Assert.assertEquals;
31+
import static org.testng.AssertJUnit.assertTrue;
2732

2833
/**
2934
* @author <a href="mailto:[email protected]">Somesh Malviya</a>
@@ -81,6 +86,7 @@ public class TransactionPayloadEventDataDeserializerTest {
8186
" after=[1, Once Upon a Time in the West, 1968, Italy, Western|Action, Claudia Cardinale|Charles Bronson|Henry Fonda|Gabriele Ferzetti|Frank Wolff|Al Mulock|Jason Robards|Woody Strode|Jack Elam|Lionel Stander|Paolo Stoppa|Keenan Wynn|Aldo Sambrell, Sergio Leone, Ennio Morricone, Sergio Leone|Sergio Donati|Dario Argento|Bernardo Bertolucci, Tonino Delli Colli, Paramount Pictures]}\n")
8287
.append("]}")
8388
.toString();
89+
private static final byte[] UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY = new byte[] {1, 0, 0, 0};
8490

8591
@Test
8692
public void deserialize() throws IOException {
@@ -97,4 +103,60 @@ public void deserialize() throws IOException {
97103
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
98104
assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString());
99105
}
106+
107+
@Test
108+
public void deserializePropagatingCompatibilityModeToTransactionPayloadEventDataDeserializer() throws IOException {
109+
110+
ByteArrayInputStream dataStream = new ByteArrayInputStream(DATA);
111+
112+
// Mock create target TransactionPayloadEventData DATA event header
113+
final EventHeaderV4 eventHeader = new EventHeaderV4();
114+
eventHeader.setEventType(EventType.TRANSACTION_PAYLOAD);
115+
eventHeader.setEventLength(DATA.length + 19L);
116+
eventHeader.setTimestamp(1646406641000L);
117+
eventHeader.setServerId(223344);
118+
119+
120+
EventHeaderDeserializer eventHeaderDeserializer = new EventHeaderDeserializer() {
121+
122+
private long count = 0L;
123+
124+
private EventHeaderDeserializer defaultEventHeaderDeserializer = new EventHeaderV4Deserializer();
125+
126+
@Override
127+
public EventHeader deserialize(ByteArrayInputStream inputStream) throws IOException {
128+
if (count > 0) {
129+
// uncompressed event header deserialize
130+
return defaultEventHeaderDeserializer.deserialize(inputStream);
131+
}
132+
count++;
133+
// we need to return target TransactionPayloadEventData DATA event header we had mocked
134+
return eventHeader;
135+
}
136+
};
137+
138+
EventDeserializer eventDeserializer = new EventDeserializer(eventHeaderDeserializer, new NullEventDataDeserializer());
139+
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.INTEGER_AS_BYTE_ARRAY);
140+
141+
Event event = eventDeserializer.nextEvent(dataStream);
142+
143+
assertTrue(event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD);
144+
assertTrue(event.getData() instanceof TransactionPayloadEventData);
145+
146+
TransactionPayloadEventData transactionPayloadEventData = event.getData();
147+
assertEquals(COMPRESSION_TYPE, transactionPayloadEventData.getCompressionType());
148+
assertEquals(PAYLOAD_SIZE, transactionPayloadEventData.getPayloadSize());
149+
assertEquals(UNCOMPRESSED_SIZE, transactionPayloadEventData.getUncompressedSize());
150+
assertEquals(NUMBER_OF_UNCOMPRESSED_EVENTS, transactionPayloadEventData.getUncompressedEvents().size());
151+
assertEquals(EventType.QUERY, transactionPayloadEventData.getUncompressedEvents().get(0).getHeader().getEventType());
152+
assertEquals(EventType.TABLE_MAP, transactionPayloadEventData.getUncompressedEvents().get(1).getHeader().getEventType());
153+
assertEquals(EventType.EXT_UPDATE_ROWS, transactionPayloadEventData.getUncompressedEvents().get(2).getHeader().getEventType());
154+
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
155+
assertTrue(transactionPayloadEventData.getUncompressedEvents().get(2).getData() instanceof UpdateRowsEventData);
156+
157+
UpdateRowsEventData updateRowsEventData = transactionPayloadEventData.getUncompressedEvents().get(2).getData();
158+
assertEquals(1, updateRowsEventData.getRows().size());
159+
Serializable[] updateBefore = updateRowsEventData.getRows().get(0).getKey();
160+
assertEquals(UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY, updateBefore[0]);
161+
}
100162
}

0 commit comments

Comments
 (0)