Skip to content

Commit 977309e

Browse files
committed
WIP
1 parent a96dce0 commit 977309e

File tree

1 file changed

+56
-32
lines changed

1 file changed

+56
-32
lines changed

modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,9 @@ public class DirectByteBufferStream {
352352
/** */
353353
private boolean uncompressFinished;
354354

355+
/** */
356+
private boolean serializeFinished;
357+
355358
/**
356359
* Constructror for stream used for writing messages.
357360
*
@@ -895,27 +898,50 @@ public void writeGridLongList(@Nullable GridLongList val) {
895898
* @param compress Whether to compress message.
896899
*/
897900
public void writeMessage(Message msg, MessageWriter writer, boolean compress) {
901+
if (compress && buf.position() != 0) {
902+
lastFinished = false;
903+
904+
return;
905+
}
906+
898907
if (msg != null) {
908+
if (compress && compressFinished && serializeFinished) {
909+
lastFinished = true;
910+
compressFinished = false;
911+
912+
return;
913+
}
914+
899915
if (buf.hasRemaining()) {
900916
try {
901917
writer.beforeInnerMessageWrite();
902918

903-
if (compress && buf.position() != 0)
904-
lastFinished = false;
905-
else {
906-
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer);
907-
908-
if (compress && !compressFinished) {
919+
if (compress) {
920+
if (!serializeFinished)
921+
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer);
922+
923+
if (lastFinished) {
924+
if (!compressFinished) {
925+
compressData();
926+
927+
lastFinished = false;
928+
compressFinished = true;
929+
serializeFinished = true;
930+
}
931+
else {
932+
lastFinished = true;
933+
compressFinished = false;
934+
serializeFinished = false;
935+
}
936+
}
937+
else {
909938
compressData();
910939

911940
lastFinished = false;
912-
compressFinished = true;
913-
}
914-
else {
915-
lastFinished = true;
916-
compressFinished = false;
917941
}
918942
}
943+
else
944+
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer);
919945
}
920946
finally {
921947
writer.afterInnerMessageWrite(lastFinished);
@@ -1074,9 +1100,6 @@ public <K, V> void writeMap(
10741100
return;
10751101
}
10761102

1077-
if (compress)
1078-
System.out.println(">>> WRITE MAP");
1079-
10801103
if (map != null) {
10811104
if (mapIt == null) {
10821105
writeInt(map.size());
@@ -1121,17 +1144,19 @@ public <K, V> void writeMap(
11211144
keyDone = false;
11221145
}
11231146

1124-
if (compress && !compressFinished) {
1125-
compressData();
1147+
if (compress) {
1148+
if (!compressFinished) {
1149+
compressData();
11261150

1127-
lastFinished = false;
1128-
compressFinished = true;
1151+
lastFinished = false;
1152+
compressFinished = true;
11291153

1130-
return;
1131-
}
1132-
else {
1133-
lastFinished = true;
1134-
compressFinished = false;
1154+
return;
1155+
}
1156+
else {
1157+
lastFinished = true;
1158+
compressFinished = false;
1159+
}
11351160
}
11361161

11371162
mapIt = null;
@@ -1662,7 +1687,10 @@ public <T extends Message> T readMessage(MessageReader reader, boolean compress)
16621687
Message msg0 = msg;
16631688

16641689
msgTypeDone = false;
1665-
uncompressFinished = false;
1690+
1691+
if (compress)
1692+
uncompressFinished = false;
1693+
16661694
msg = null;
16671695

16681696
return (T)msg0;
@@ -1843,9 +1871,11 @@ private <C extends Collection<?>> C readCollection(MessageCollectionItemType ite
18431871

18441872
M map0 = (M)map;
18451873

1846-
uncompressFinished = false;
18471874
map = null;
18481875

1876+
if (compress)
1877+
uncompressFinished = false;
1878+
18491879
return map0;
18501880
}
18511881

@@ -2409,11 +2439,7 @@ private void compressData() {
24092439

24102440
buf.clear();
24112441

2412-
byte[] compressed = baos.toByteArray();
2413-
2414-
writeByteArray(compressed);
2415-
2416-
System.out.println(">>> CompressData: rawData=" + rawData.length + ", compressed=" + compressed.length);
2442+
writeByteArray(baos.toByteArray());
24172443
}
24182444

24192445
/** */
@@ -2457,8 +2483,6 @@ private boolean uncompressData() {
24572483

24582484
buf.flip();
24592485

2460-
System.out.println(">>> uncompressData: rawData=" + uncompressedData.length + ", compressed=" + compressedData.length);
2461-
24622486
return true;
24632487
}
24642488

0 commit comments

Comments
 (0)