Skip to content

Commit 04714c5

Browse files
committed
WIP
1 parent 05fa9b6 commit 04714c5

File tree

16 files changed

+512
-179
lines changed

16 files changed

+512
-179
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal;
19+
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
/** This annotation indicates that this field will be compressed during serialization. */
26+
@Retention(RetentionPolicy.CLASS)
27+
@Target(ElementType.FIELD)
28+
public @interface Compress {
29+
// No-op.
30+
}

modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,8 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception {
311311

312312
String getExpr = (F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName) + "()";
313313

314+
boolean compress = field.getAnnotation(Compress.class) != null;
315+
314316
TypeMirror type = field.asType();
315317

316318
if (type.getKind().isPrimitive()) {
@@ -364,9 +366,15 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) {
364366

365367
imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType");
366368

367-
returnFalseIfWriteFailed(write, "writer.writeMap", getExpr,
368-
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)),
369-
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)));
369+
List<String> args = new ArrayList<>();
370+
args.add(getExpr);
371+
args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)));
372+
args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)));
373+
374+
if (compress)
375+
args.add("true");
376+
377+
returnFalseIfWriteFailed(write, "writer.writeMap", args.toArray(String[]::new));
370378
}
371379

372380
else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
@@ -432,6 +440,8 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception {
432440

433441
String name = F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName;
434442

443+
boolean compress = field.getAnnotation(Compress.class) != null;
444+
435445
if (type.getKind().isPrimitive()) {
436446
String typeName = capitalizeOnlyFirst(type.getKind().name());
437447

@@ -504,9 +514,15 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) {
504514

505515
assert typeArgs.size() == 2;
506516

507-
returnFalseIfReadFailed(name, "reader.readMap",
508-
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)),
509-
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)), "false");
517+
List<String> args = new ArrayList<>();
518+
args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)));
519+
args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)));
520+
args.add("false");
521+
522+
if (compress)
523+
args.add("true");
524+
525+
returnFalseIfReadFailed(name, "reader.readMap", args.toArray(String[]::new));
510526
}
511527

512528
else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,10 @@ public ByteBuffer getBuffer() {
381381

382382
/** {@inheritDoc} */
383383
@Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
384-
MessageCollectionItemType valType, boolean linked) {
384+
MessageCollectionItemType valType, boolean linked, boolean compress) {
385385
DirectByteBufferStream stream = state.item().stream;
386386

387-
M map = stream.readMap(keyType, valType, linked, this);
387+
M map = stream.readMap(keyType, valType, linked, this, compress);
388388

389389
lastRead = stream.lastFinished();
390390

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,10 @@ public ByteBuffer getBuffer() {
347347

348348
/** {@inheritDoc} */
349349
@Override public <K, V> boolean writeMap(Map<K, V> map, MessageCollectionItemType keyType,
350-
MessageCollectionItemType valType) {
350+
MessageCollectionItemType valType, boolean compress) {
351351
DirectByteBufferStream stream = state.item().stream;
352352

353-
stream.writeMap(map, keyType, valType, this);
353+
stream.writeMap(map, keyType, valType, this, compress);
354354

355355
return stream.lastFinished();
356356
}

modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java

Lines changed: 161 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.ignite.internal.direct.stream;
1919

20+
import java.io.ByteArrayInputStream;
21+
import java.io.ByteArrayOutputStream;
22+
import java.io.IOException;
2023
import java.lang.reflect.Array;
2124
import java.nio.ByteBuffer;
2225
import java.util.ArrayList;
@@ -27,6 +30,10 @@
2730
import java.util.Map;
2831
import java.util.RandomAccess;
2932
import java.util.UUID;
33+
import java.util.zip.Deflater;
34+
import java.util.zip.DeflaterOutputStream;
35+
import java.util.zip.Inflater;
36+
import java.util.zip.InflaterInputStream;
3037
import org.apache.ignite.IgniteCheckedException;
3138
import org.apache.ignite.IgniteException;
3239
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -337,6 +344,15 @@ public class DirectByteBufferStream {
337344
/** */
338345
private byte cacheObjType;
339346

347+
/** */
348+
private boolean compressMapFinished;
349+
350+
/** */
351+
private boolean uncompressMapFinished;
352+
353+
/** */
354+
private boolean needWriteMapSize = true;
355+
340356
/**
341357
* Constructror for stream used for writing messages.
342358
*
@@ -1010,8 +1026,21 @@ private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType i
10101026
* @param keyType Key type.
10111027
* @param valType Value type.
10121028
* @param writer Writer.
1029+
* @param compress Whether to compress map.
10131030
*/
1014-
public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType, MessageWriter writer) {
1031+
public <K, V> void writeMap(
1032+
Map<K, V> map,
1033+
MessageCollectionItemType keyType,
1034+
MessageCollectionItemType valType,
1035+
MessageWriter writer,
1036+
boolean compress
1037+
) {
1038+
if (compress && buf.position() != 0) {
1039+
lastFinished = false;
1040+
1041+
return;
1042+
}
1043+
10151044
if (map != null) {
10161045
if (mapIt == null) {
10171046
writeInt(map.size());
@@ -1033,25 +1062,69 @@ public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, Me
10331062
if (!keyDone) {
10341063
write(keyType, e.getKey(), writer);
10351064

1036-
if (!lastFinished)
1065+
if (!lastFinished) {
1066+
if (compress)
1067+
compressData();
1068+
10371069
return;
1070+
}
10381071

10391072
keyDone = true;
10401073
}
10411074

10421075
write(valType, e.getValue(), writer);
10431076

1044-
if (!lastFinished)
1077+
if (!lastFinished) {
1078+
if (compress)
1079+
compressData();
1080+
10451081
return;
1082+
}
10461083

10471084
mapCur = NULL;
10481085
keyDone = false;
10491086
}
10501087

1088+
if (compress && !compressMapFinished) {
1089+
compressData();
1090+
1091+
lastFinished = false;
1092+
compressMapFinished = true;
1093+
1094+
return;
1095+
}
1096+
else {
1097+
lastFinished = true;
1098+
compressMapFinished = false;
1099+
}
1100+
10511101
mapIt = null;
10521102
}
1053-
else
1054-
writeInt(-1);
1103+
else {
1104+
if (!compress) {
1105+
writeInt(-1);
1106+
1107+
return;
1108+
}
1109+
1110+
if (needWriteMapSize) {
1111+
writeInt(-1);
1112+
1113+
needWriteMapSize = false;
1114+
}
1115+
1116+
if (!compressMapFinished) {
1117+
compressData();
1118+
1119+
lastFinished = false;
1120+
compressMapFinished = true;
1121+
}
1122+
else {
1123+
lastFinished = true;
1124+
needWriteMapSize = true;
1125+
compressMapFinished = false;
1126+
}
1127+
}
10551128
}
10561129

10571130
/**
@@ -1647,10 +1720,20 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item
16471720
* @param valType Value type.
16481721
* @param linked Whether linked map should be created.
16491722
* @param reader Reader.
1723+
* @param compress Whether the map is compressed.
16501724
* @return Map.
16511725
*/
16521726
public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
1653-
boolean linked, MessageReader reader) {
1727+
boolean linked, MessageReader reader, boolean compress) {
1728+
if (compress && !uncompressMapFinished) {
1729+
uncompressData();
1730+
1731+
if (!lastFinished)
1732+
return null;
1733+
1734+
uncompressMapFinished = true;
1735+
}
1736+
16541737
if (readSize == -1) {
16551738
int size = readInt();
16561739

@@ -1695,6 +1778,7 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item
16951778
M map0 = (M)map;
16961779

16971780
map = null;
1781+
uncompressMapFinished = false;
16981782

16991783
return map0;
17001784
}
@@ -2233,6 +2317,77 @@ private void readUuidRaw() {
22332317
return S.toString(DirectByteBufferStream.class, this);
22342318
}
22352319

2320+
/** */
2321+
private void compressData() {
2322+
if (buf.position() == 0)
2323+
return;
2324+
2325+
byte[] rawData = new byte[buf.position()];
2326+
2327+
buf.flip();
2328+
buf.get(rawData);
2329+
2330+
ByteArrayOutputStream baos = new ByteArrayOutputStream(rawData.length);
2331+
Deflater deflater = new Deflater(Deflater.BEST_SPEED, true);
2332+
2333+
try (DeflaterOutputStream dos = new DeflaterOutputStream(baos, deflater)) {
2334+
dos.write(rawData);
2335+
dos.finish();
2336+
}
2337+
catch (IOException ex) {
2338+
throw new IgniteException(ex);
2339+
}
2340+
finally {
2341+
deflater.end();
2342+
}
2343+
2344+
buf.clear();
2345+
2346+
writeByteArray(baos.toByteArray());
2347+
}
2348+
2349+
/** */
2350+
private void uncompressData() {
2351+
byte[] compressedData = readByteArray();
2352+
2353+
if (!lastFinished || compressedData == null)
2354+
return;
2355+
2356+
byte[] uncompressedData;
2357+
2358+
Inflater inflater = new Inflater(true);
2359+
2360+
try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(compressedData), inflater)) {
2361+
uncompressedData = iis.readAllBytes();
2362+
}
2363+
catch (IOException ex) {
2364+
throw new IgniteException(ex);
2365+
}
2366+
finally {
2367+
inflater.end();
2368+
}
2369+
2370+
byte[] tmpBuf = null;
2371+
2372+
if (buf.remaining() > 0) {
2373+
tmpBuf = new byte[buf.remaining()];
2374+
buf.get(tmpBuf);
2375+
}
2376+
2377+
int tmpBufLength = tmpBuf != null ? tmpBuf.length : 0;
2378+
2379+
if (uncompressedData.length + tmpBufLength > buf.capacity())
2380+
buf = ByteBuffer.allocateDirect(uncompressedData.length + tmpBufLength);
2381+
2382+
buf.clear();
2383+
buf.put(uncompressedData);
2384+
2385+
if (tmpBuf != null)
2386+
buf.put(tmpBuf);
2387+
2388+
buf.flip();
2389+
}
2390+
22362391
/**
22372392
* Array creator.
22382393
*/

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import org.apache.ignite.internal.codegen.GridDhtLockResponseSerializer;
6565
import org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer;
6666
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
67+
import org.apache.ignite.internal.codegen.GridDhtPartitionFullMapSerializer;
68+
import org.apache.ignite.internal.codegen.GridDhtPartitionMapSerializer;
6769
import org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
6870
import org.apache.ignite.internal.codegen.GridDhtPartitionsFullMessageSerializer;
6971
import org.apache.ignite.internal.codegen.GridDhtPartitionsSingleMessageSerializer;
@@ -101,6 +103,7 @@
101103
import org.apache.ignite.internal.codegen.GridNearTxPrepareRequestSerializer;
102104
import org.apache.ignite.internal.codegen.GridNearTxPrepareResponseSerializer;
103105
import org.apache.ignite.internal.codegen.GridNearUnlockRequestSerializer;
106+
import org.apache.ignite.internal.codegen.GridPartitionStateMapSerializer;
104107
import org.apache.ignite.internal.codegen.GridQueryCancelRequestSerializer;
105108
import org.apache.ignite.internal.codegen.GridQueryFailResponseSerializer;
106109
import org.apache.ignite.internal.codegen.GridQueryKillRequestSerializer;
@@ -216,6 +219,8 @@
216219
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
217220
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
218221
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
222+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
223+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
219224
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
220225
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
221226
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -285,6 +290,7 @@
285290
import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult;
286291
import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch;
287292
import org.apache.ignite.internal.util.GridByteArrayList;
293+
import org.apache.ignite.internal.util.GridPartitionStateMap;
288294
import org.apache.ignite.internal.util.UUIDCollectionMessage;
289295
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
290296
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -486,6 +492,9 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
486492
new IgniteDhtPartitionsToReloadMapSerializer());
487493
factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new IntLongMapSerializer());
488494
factory.register(DeploymentModeMessage.TYPE_CODE, DeploymentModeMessage::new, new DeploymentModeMessageSerializer());
495+
factory.register(GridPartitionStateMap.TYPE_CODE, GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
496+
factory.register(GridDhtPartitionMap.TYPE_CODE, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer());
497+
factory.register(GridDhtPartitionFullMap.TYPE_CODE, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer());
489498

490499
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
491500
// [120..123] - DR

0 commit comments

Comments
 (0)