Skip to content

Commit e3effda

Browse files
committed
WIP
1 parent e1ad49d commit e3effda

File tree

8 files changed

+118
-52
lines changed

8 files changed

+118
-52
lines changed

modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,12 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.
386386
else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList")))
387387
returnFalseIfWriteFailed(write, "writer.writeGridLongList", getExpr);
388388

389-
else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
390-
returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr);
389+
else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
390+
if (compress)
391+
returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr, "true");
392+
else
393+
returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr);
394+
}
391395

392396
else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
393397
List<? extends TypeMirror> typeArgs = ((DeclaredType)type).getTypeArguments();
@@ -534,8 +538,12 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.
534538
else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList")))
535539
returnFalseIfReadFailed(name, "reader.readGridLongList");
536540

537-
else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
538-
returnFalseIfReadFailed(name, "reader.readMessage");
541+
else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
542+
if (compress)
543+
returnFalseIfReadFailed(name, "reader.readMessage", "true");
544+
else
545+
returnFalseIfReadFailed(name, "reader.readMessage");
546+
}
539547

540548
else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
541549
List<? extends TypeMirror> typeArgs = ((DeclaredType)type).getTypeArguments();

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,10 @@ public ByteBuffer getBuffer() {
314314
}
315315

316316
/** {@inheritDoc} */
317-
@Nullable @Override public <T extends Message> T readMessage() {
317+
@Nullable @Override public <T extends Message> T readMessage(boolean compress) {
318318
DirectByteBufferStream stream = state.item().stream;
319319

320-
T msg = stream.readMessage(this);
320+
T msg = stream.readMessage(this, compress);
321321

322322
lastRead = stream.lastFinished();
323323

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,10 @@ public ByteBuffer getBuffer() {
292292
}
293293

294294
/** {@inheritDoc} */
295-
@Override public boolean writeMessage(@Nullable Message msg) {
295+
@Override public boolean writeMessage(@Nullable Message msg, boolean compress) {
296296
DirectByteBufferStream stream = state.item().stream;
297297

298-
stream.writeMessage(msg, this);
298+
stream.writeMessage(msg, this, compress);
299299

300300
return stream.lastFinished();
301301
}

modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java

Lines changed: 73 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -345,13 +345,10 @@ public class DirectByteBufferStream {
345345
private byte cacheObjType;
346346

347347
/** */
348-
private boolean compressMapFinished;
348+
private boolean compressFinished;
349349

350350
/** */
351-
private boolean uncompressMapFinished;
352-
353-
/** */
354-
private boolean needWriteMapSize = true;
351+
private boolean uncompressFinished;
355352

356353
/**
357354
* Constructror for stream used for writing messages.
@@ -893,14 +890,29 @@ public void writeGridLongList(@Nullable GridLongList val) {
893890
/**
894891
* @param msg Message.
895892
* @param writer Writer.
893+
* @param compress Whether to compress message.
896894
*/
897-
public void writeMessage(Message msg, MessageWriter writer) {
895+
public void writeMessage(Message msg, MessageWriter writer, boolean compress) {
898896
if (msg != null) {
899897
if (buf.hasRemaining()) {
900898
try {
901899
writer.beforeInnerMessageWrite();
902900

903-
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer);
901+
if (compress && buf.position() != 0)
902+
lastFinished = false;
903+
else
904+
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer);
905+
906+
if (compress && !compressFinished) {
907+
compressData();
908+
909+
lastFinished = false;
910+
compressFinished = true;
911+
}
912+
else {
913+
lastFinished = true;
914+
compressFinished = false;
915+
}
904916
}
905917
finally {
906918
writer.afterInnerMessageWrite(lastFinished);
@@ -909,8 +921,26 @@ public void writeMessage(Message msg, MessageWriter writer) {
909921
else
910922
lastFinished = false;
911923
}
912-
else
913-
writeShort(Short.MIN_VALUE);
924+
else {
925+
if (!compress) {
926+
writeShort(Short.MIN_VALUE);
927+
928+
return;
929+
}
930+
931+
if (!compressFinished) {
932+
writeShort(Short.MIN_VALUE);
933+
934+
compressData();
935+
936+
lastFinished = false;
937+
compressFinished = true;
938+
}
939+
else {
940+
lastFinished = true;
941+
compressFinished = false;
942+
}
943+
}
914944
}
915945

916946
/**
@@ -935,7 +965,7 @@ public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, Me
935965
if (arrCur == NULL)
936966
arrCur = arr[arrPos++];
937967

938-
write(itemType, arrCur, writer);
968+
write(itemType, arrCur, writer, false);
939969

940970
if (!lastFinished)
941971
return;
@@ -972,7 +1002,7 @@ public <T> void writeCollection(Collection<T> col, MessageCollectionItemType ite
9721002
if (cur == NULL)
9731003
cur = it.next();
9741004

975-
write(itemType, cur, writer);
1005+
write(itemType, cur, writer, false);
9761006

9771007
if (!lastFinished)
9781008
return;
@@ -1010,7 +1040,7 @@ private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType i
10101040
if (arrCur == NULL)
10111041
arrCur = list.get(arrPos++);
10121042

1013-
write(itemType, arrCur, writer);
1043+
write(itemType, arrCur, writer, false);
10141044

10151045
if (!lastFinished)
10161046
return;
@@ -1060,7 +1090,7 @@ public <K, V> void writeMap(
10601090
e = (Map.Entry<K, V>)mapCur;
10611091

10621092
if (!keyDone) {
1063-
write(keyType, e.getKey(), writer);
1093+
write(keyType, e.getKey(), writer, compress);
10641094

10651095
if (!lastFinished) {
10661096
if (compress)
@@ -1072,7 +1102,7 @@ public <K, V> void writeMap(
10721102
keyDone = true;
10731103
}
10741104

1075-
write(valType, e.getValue(), writer);
1105+
write(valType, e.getValue(), writer, compress);
10761106

10771107
if (!lastFinished) {
10781108
if (compress)
@@ -1085,17 +1115,17 @@ public <K, V> void writeMap(
10851115
keyDone = false;
10861116
}
10871117

1088-
if (compress && !compressMapFinished) {
1118+
if (compress && !compressFinished) {
10891119
compressData();
10901120

10911121
lastFinished = false;
1092-
compressMapFinished = true;
1122+
compressFinished = true;
10931123

10941124
return;
10951125
}
10961126
else {
10971127
lastFinished = true;
1098-
compressMapFinished = false;
1128+
compressFinished = false;
10991129
}
11001130

11011131
mapIt = null;
@@ -1107,22 +1137,17 @@ public <K, V> void writeMap(
11071137
return;
11081138
}
11091139

1110-
if (needWriteMapSize) {
1140+
if (!compressFinished) {
11111141
writeInt(-1);
11121142

1113-
needWriteMapSize = false;
1114-
}
1115-
1116-
if (!compressMapFinished) {
11171143
compressData();
11181144

11191145
lastFinished = false;
1120-
compressMapFinished = true;
1146+
compressFinished = true;
11211147
}
11221148
else {
11231149
lastFinished = true;
1124-
needWriteMapSize = true;
1125-
compressMapFinished = false;
1150+
compressFinished = false;
11261151
}
11271152
}
11281153
}
@@ -1590,7 +1615,16 @@ public GridLongList readGridLongList() {
15901615
* @param reader Reader.
15911616
* @return Message.
15921617
*/
1593-
public <T extends Message> T readMessage(MessageReader reader) {
1618+
public <T extends Message> T readMessage(MessageReader reader, boolean compress) {
1619+
if (compress && !uncompressFinished) {
1620+
uncompressData();
1621+
1622+
if (!lastFinished)
1623+
return null;
1624+
1625+
uncompressFinished = true;
1626+
}
1627+
15941628
if (!msgTypeDone) {
15951629
if (buf.remaining() < Message.DIRECT_TYPE_SIZE) {
15961630
lastFinished = false;
@@ -1622,6 +1656,7 @@ public <T extends Message> T readMessage(MessageReader reader) {
16221656
Message msg0 = msg;
16231657

16241658
msgTypeDone = false;
1659+
uncompressFinished = false;
16251660
msg = null;
16261661

16271662
return (T)msg0;
@@ -1651,7 +1686,7 @@ public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> item
16511686
objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
16521687

16531688
for (int i = readItems; i < readSize; i++) {
1654-
Object item = read(itemType, reader);
1689+
Object item = read(itemType, reader, false);
16551690

16561691
if (!lastFinished)
16571692
return null;
@@ -1693,7 +1728,7 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item
16931728
col = new ArrayList<>(readSize);
16941729

16951730
for (int i = readItems; i < readSize; i++) {
1696-
Object item = read(itemType, reader);
1731+
Object item = read(itemType, reader, false);
16971732

16981733
if (!lastFinished)
16991734
return null;
@@ -1725,13 +1760,13 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item
17251760
*/
17261761
public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
17271762
boolean linked, MessageReader reader, boolean compress) {
1728-
if (compress && !uncompressMapFinished) {
1763+
if (compress && !uncompressFinished) {
17291764
uncompressData();
17301765

17311766
if (!lastFinished)
17321767
return null;
17331768

1734-
uncompressMapFinished = true;
1769+
uncompressFinished = true;
17351770
}
17361771

17371772
if (readSize == -1) {
@@ -1749,7 +1784,7 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item
17491784

17501785
for (int i = readItems; i < readSize; i++) {
17511786
if (!keyDone) {
1752-
Object key = read(keyType, reader);
1787+
Object key = read(keyType, reader, compress);
17531788

17541789
if (!lastFinished)
17551790
return null;
@@ -1758,7 +1793,7 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item
17581793
keyDone = true;
17591794
}
17601795

1761-
Object val = read(valType, reader);
1796+
Object val = read(valType, reader, compress);
17621797

17631798
if (!lastFinished)
17641799
return null;
@@ -1777,8 +1812,8 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item
17771812

17781813
M map0 = (M)map;
17791814

1815+
uncompressFinished = false;
17801816
map = null;
1781-
uncompressMapFinished = false;
17821817

17831818
return map0;
17841819
}
@@ -2040,7 +2075,7 @@ <T> T readArrayLE(ArrayCreator<T> creator, int typeSize, int lenShift, long off)
20402075
* @param val Value.
20412076
* @param writer Writer.
20422077
*/
2043-
protected void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
2078+
protected void write(MessageCollectionItemType type, Object val, MessageWriter writer, boolean compress) {
20442079
switch (type) {
20452080
case BYTE:
20462081
writeByte((Byte)val);
@@ -2167,7 +2202,7 @@ protected void write(MessageCollectionItemType type, Object val, MessageWriter w
21672202
if (val != null)
21682203
writer.beforeInnerMessageWrite();
21692204

2170-
writeMessage((Message)val, writer);
2205+
writeMessage((Message)val, writer, compress);
21712206
}
21722207
finally {
21732208
if (val != null)
@@ -2184,9 +2219,10 @@ protected void write(MessageCollectionItemType type, Object val, MessageWriter w
21842219
/**
21852220
* @param type Type.
21862221
* @param reader Reader.
2222+
* @param compress Whether message is compressed.
21872223
* @return Value.
21882224
*/
2189-
protected Object read(MessageCollectionItemType type, MessageReader reader) {
2225+
protected Object read(MessageCollectionItemType type, MessageReader reader, boolean compress) {
21902226
switch (type) {
21912227
case BYTE:
21922228
return readByte();
@@ -2261,7 +2297,7 @@ protected Object read(MessageCollectionItemType type, MessageReader reader) {
22612297
return readGridLongList();
22622298

22632299
case MSG:
2264-
return readMessage(reader);
2300+
return readMessage(reader, compress);
22652301

22662302
default:
22672303
throw new IllegalArgumentException("Unknown type: " + type);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,19 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
6464

6565
/** Partitions update counters. */
6666
@Order(value = 8, method = "partitionCounters")
67-
//@Compress TODO
67+
@Compress
6868
@GridToStringInclude
6969
private IgniteDhtPartitionCountersMap partCntrs;
7070

7171
/** Partitions history suppliers. */
7272
@Order(value = 9, method = "partitionHistorySuppliers")
73-
//@Compress TODO
73+
@Compress
7474
@GridToStringInclude
7575
private IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
7676

7777
/** Partitions that must be cleared and re-loaded. */
7878
@Order(value = 10, method = "partitionsToReload")
79-
//@Compress TODO
79+
@Compress
8080
@GridToStringInclude
8181
private IgniteDhtPartitionsToReloadMap partsToReload;
8282

modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,18 @@ public default void setBuffer(ByteBuffer buf) {
198198
* @param <T> Type of the message.
199199
* @return Message.
200200
*/
201-
public <T extends Message> T readMessage();
201+
public default <T extends Message> T readMessage() {
202+
return readMessage(false);
203+
}
204+
205+
/**
206+
* Reads nested message.
207+
*
208+
* @param compress Whether the message is compressed.
209+
* @param <T> Type of the message.
210+
* @return Message.
211+
*/
212+
public <T extends Message> T readMessage(boolean compress);
202213

203214
/**
204215
* Reads {@link CacheObject}.

0 commit comments

Comments
 (0)