diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java index 81dea25..c1fc896 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.EnumSet; -import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Map; @@ -218,6 +217,11 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) { compatibilitySet.contains(CompatibilityMode.INTEGER_AS_BYTE_ARRAY) ); } + if (eventDataDeserializer instanceof TransactionPayloadEventDataDeserializer) { + TransactionPayloadEventDataDeserializer deserializer = + (TransactionPayloadEventDataDeserializer) eventDataDeserializer; + deserializer.setCompatibilityModes(compatibilitySet); + } } /** diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java index 5bbbd47..61bfe94 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; /** * @author Somesh Malviya @@ -34,6 +36,12 @@ public class TransactionPayloadEventDataDeserializer implements EventDataDeseria public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2; public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3; + private EnumSet compatibilitySet = EnumSet.noneOf(EventDeserializer.CompatibilityMode.class); + + public void setCompatibilityModes(EnumSet compatibilityModes) { + this.compatibilitySet = compatibilityModes; + } + @Override public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException { TransactionPayloadEventData eventData = new TransactionPayloadEventData(); @@ -88,9 +96,15 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) return eventData; } - private static ArrayList getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException { + private ArrayList getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException { ArrayList decompressedEvents = new ArrayList<>(); EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer(); + if (!compatibilitySet.isEmpty()) { + EventDeserializer.CompatibilityMode[] compatibilityModes = compatibilitySet.toArray(new EventDeserializer.CompatibilityMode[0]); + EventDeserializer.CompatibilityMode first = compatibilityModes[0]; + EventDeserializer.CompatibilityMode[] rest = Arrays.copyOfRange(compatibilityModes, 1, compatibilityModes.length); + transactionPayloadEventDeserializer.setCompatibilityMode(first, rest); + } try (ZstdInputStream zstdInputStream = new ZstdInputStream(new java.io.ByteArrayInputStream(eventData.getPayload()))) { ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(zstdInputStream); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java index a3b8e76..43d1123 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java @@ -15,15 +15,20 @@ */ package com.github.shyiko.mysql.binlog.event.deserialization; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeader; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import com.github.shyiko.mysql.binlog.event.EventType; import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData; -import com.github.shyiko.mysql.binlog.event.XAPrepareEventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import org.testng.annotations.Test; import java.io.IOException; +import java.io.Serializable; import static org.testng.Assert.assertEquals; +import static org.testng.AssertJUnit.assertTrue; /** * @author Somesh Malviya @@ -81,6 +86,7 @@ public class TransactionPayloadEventDataDeserializerTest { " after=[1, Once Upon a Time in the West, 1968, Italy, Western|Action, Claudia Cardinale|Charles Bronson|Henry Fonda|Gabriele Ferzetti|Frank Wolff|Al Mulock|Jason Robards|Woody Strode|Jack Elam|Lionel Stander|Paolo Stoppa|Keenan Wynn|Aldo Sambrell, Sergio Leone, Ennio Morricone, Sergio Leone|Sergio Donati|Dario Argento|Bernardo Bertolucci, Tonino Delli Colli, Paramount Pictures]}\n") .append("]}") .toString(); + private static final byte[] UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY = new byte[] {1, 0, 0, 0}; @Test public void deserialize() throws IOException { @@ -97,4 +103,60 @@ public void deserialize() throws IOException { assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType()); assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString()); } + + @Test + public void deserializePropagatingCompatibilityModeToTransactionPayloadEventDataDeserializer() throws IOException { + + ByteArrayInputStream dataStream = new ByteArrayInputStream(DATA); + + // Mock create target TransactionPayloadEventData DATA event header + final EventHeaderV4 eventHeader = new EventHeaderV4(); + eventHeader.setEventType(EventType.TRANSACTION_PAYLOAD); + eventHeader.setEventLength(DATA.length + 19L); + eventHeader.setTimestamp(1646406641000L); + eventHeader.setServerId(223344); + + + EventHeaderDeserializer eventHeaderDeserializer = new EventHeaderDeserializer() { + + private long count = 0L; + + private EventHeaderDeserializer defaultEventHeaderDeserializer = new EventHeaderV4Deserializer(); + + @Override + public EventHeader deserialize(ByteArrayInputStream inputStream) throws IOException { + if (count > 0) { + // uncompressed event header deserialize + return defaultEventHeaderDeserializer.deserialize(inputStream); + } + count++; + // we need to return target TransactionPayloadEventData DATA event header we had mocked + return eventHeader; + } + }; + + EventDeserializer eventDeserializer = new EventDeserializer(eventHeaderDeserializer, new NullEventDataDeserializer()); + eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.INTEGER_AS_BYTE_ARRAY); + + Event event = eventDeserializer.nextEvent(dataStream); + + assertTrue(event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD); + assertTrue(event.getData() instanceof TransactionPayloadEventData); + + TransactionPayloadEventData transactionPayloadEventData = event.getData(); + assertEquals(COMPRESSION_TYPE, transactionPayloadEventData.getCompressionType()); + assertEquals(PAYLOAD_SIZE, transactionPayloadEventData.getPayloadSize()); + assertEquals(UNCOMPRESSED_SIZE, transactionPayloadEventData.getUncompressedSize()); + assertEquals(NUMBER_OF_UNCOMPRESSED_EVENTS, transactionPayloadEventData.getUncompressedEvents().size()); + assertEquals(EventType.QUERY, transactionPayloadEventData.getUncompressedEvents().get(0).getHeader().getEventType()); + assertEquals(EventType.TABLE_MAP, transactionPayloadEventData.getUncompressedEvents().get(1).getHeader().getEventType()); + assertEquals(EventType.EXT_UPDATE_ROWS, transactionPayloadEventData.getUncompressedEvents().get(2).getHeader().getEventType()); + assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType()); + assertTrue(transactionPayloadEventData.getUncompressedEvents().get(2).getData() instanceof UpdateRowsEventData); + + UpdateRowsEventData updateRowsEventData = transactionPayloadEventData.getUncompressedEvents().get(2).getData(); + assertEquals(1, updateRowsEventData.getRows().size()); + Serializable[] updateBefore = updateRowsEventData.getRows().get(0).getKey(); + assertEquals(UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY, updateBefore[0]); + } }