Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;

Expand Down Expand Up @@ -218,6 +217,11 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
compatibilitySet.contains(CompatibilityMode.INTEGER_AS_BYTE_ARRAY)
);
}
if (eventDataDeserializer instanceof TransactionPayloadEventDataDeserializer) {
TransactionPayloadEventDataDeserializer deserializer =
(TransactionPayloadEventDataDeserializer) eventDataDeserializer;
deserializer.setCompatibilityModes(compatibilitySet);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;

/**
* @author <a href="mailto:[email protected]">Somesh Malviya</a>
Expand All @@ -34,6 +36,12 @@ public class TransactionPayloadEventDataDeserializer implements EventDataDeseria
public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2;
public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3;

private EnumSet<EventDeserializer.CompatibilityMode> compatibilitySet = EnumSet.noneOf(EventDeserializer.CompatibilityMode.class);

public void setCompatibilityModes(EnumSet<EventDeserializer.CompatibilityMode> compatibilityModes) {
this.compatibilitySet = compatibilityModes;
}

@Override
public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
TransactionPayloadEventData eventData = new TransactionPayloadEventData();
Expand Down Expand Up @@ -88,9 +96,15 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)
return eventData;
}

private static ArrayList<Event> getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException {
private ArrayList<Event> getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException {
ArrayList<Event> decompressedEvents = new ArrayList<>();
EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer();
if (!compatibilitySet.isEmpty()) {
EventDeserializer.CompatibilityMode[] compatibilityModes = compatibilitySet.toArray(new EventDeserializer.CompatibilityMode[0]);
EventDeserializer.CompatibilityMode first = compatibilityModes[0];
EventDeserializer.CompatibilityMode[] rest = Arrays.copyOfRange(compatibilityModes, 1, compatibilityModes.length);
transactionPayloadEventDeserializer.setCompatibilityMode(first, rest);
}

try (ZstdInputStream zstdInputStream = new ZstdInputStream(new java.io.ByteArrayInputStream(eventData.getPayload()))) {
ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(zstdInputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
*/
package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
import com.github.shyiko.mysql.binlog.event.XAPrepareEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import org.testng.annotations.Test;

import java.io.IOException;
import java.io.Serializable;

import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

/**
* @author <a href="mailto:[email protected]">Somesh Malviya</a>
Expand Down Expand Up @@ -81,6 +86,7 @@ public class TransactionPayloadEventDataDeserializerTest {
" after=[1, Once Upon a Time in the West, 1968, Italy, Western|Action, Claudia Cardinale|Charles Bronson|Henry Fonda|Gabriele Ferzetti|Frank Wolff|Al Mulock|Jason Robards|Woody Strode|Jack Elam|Lionel Stander|Paolo Stoppa|Keenan Wynn|Aldo Sambrell, Sergio Leone, Ennio Morricone, Sergio Leone|Sergio Donati|Dario Argento|Bernardo Bertolucci, Tonino Delli Colli, Paramount Pictures]}\n")
.append("]}")
.toString();
private static final byte[] UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY = new byte[] {1, 0, 0, 0};

@Test
public void deserialize() throws IOException {
Expand All @@ -97,4 +103,60 @@ public void deserialize() throws IOException {
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString());
}

@Test
public void deserializePropagatingCompatibilityModeToTransactionPayloadEventDataDeserializer() throws IOException {

ByteArrayInputStream dataStream = new ByteArrayInputStream(DATA);

// Mock create target TransactionPayloadEventData DATA event header
final EventHeaderV4 eventHeader = new EventHeaderV4();
eventHeader.setEventType(EventType.TRANSACTION_PAYLOAD);
eventHeader.setEventLength(DATA.length + 19L);
eventHeader.setTimestamp(1646406641000L);
eventHeader.setServerId(223344);


EventHeaderDeserializer eventHeaderDeserializer = new EventHeaderDeserializer() {

private long count = 0L;

private EventHeaderDeserializer defaultEventHeaderDeserializer = new EventHeaderV4Deserializer();

@Override
public EventHeader deserialize(ByteArrayInputStream inputStream) throws IOException {
if (count > 0) {
// uncompressed event header deserialize
return defaultEventHeaderDeserializer.deserialize(inputStream);
}
count++;
// we need to return target TransactionPayloadEventData DATA event header we had mocked
return eventHeader;
}
};

EventDeserializer eventDeserializer = new EventDeserializer(eventHeaderDeserializer, new NullEventDataDeserializer());
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.INTEGER_AS_BYTE_ARRAY);

Event event = eventDeserializer.nextEvent(dataStream);

assertTrue(event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD);
assertTrue(event.getData() instanceof TransactionPayloadEventData);

TransactionPayloadEventData transactionPayloadEventData = event.getData();
assertEquals(COMPRESSION_TYPE, transactionPayloadEventData.getCompressionType());
assertEquals(PAYLOAD_SIZE, transactionPayloadEventData.getPayloadSize());
assertEquals(UNCOMPRESSED_SIZE, transactionPayloadEventData.getUncompressedSize());
assertEquals(NUMBER_OF_UNCOMPRESSED_EVENTS, transactionPayloadEventData.getUncompressedEvents().size());
assertEquals(EventType.QUERY, transactionPayloadEventData.getUncompressedEvents().get(0).getHeader().getEventType());
assertEquals(EventType.TABLE_MAP, transactionPayloadEventData.getUncompressedEvents().get(1).getHeader().getEventType());
assertEquals(EventType.EXT_UPDATE_ROWS, transactionPayloadEventData.getUncompressedEvents().get(2).getHeader().getEventType());
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
assertTrue(transactionPayloadEventData.getUncompressedEvents().get(2).getData() instanceof UpdateRowsEventData);

UpdateRowsEventData updateRowsEventData = transactionPayloadEventData.getUncompressedEvents().get(2).getData();
assertEquals(1, updateRowsEventData.getRows().size());
Serializable[] updateBefore = updateRowsEventData.getRows().get(0).getKey();
assertEquals(UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY, updateBefore[0]);
}
}
Loading