Skip to content

Commit 66b9f8d

Browse files
committed
IGNITE-27112 Use BinaryMarshaller for serializing User objects during Message communication
1 parent f016170 commit 66b9f8d

File tree

22 files changed

+181
-63
lines changed

22 files changed

+181
-63
lines changed

modules/binary/api/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,7 @@ public byte mapType(Class<? extends Map> cls) {
930930
* @param typeName Type name.
931931
* @return Type ID.
932932
*/
933+
// TODO?
933934
public int typeId(String typeName) {
934935
Integer id = predefinedTypeNames.get(SIMPLE_NAME_LOWER_CASE_MAPPER.typeName(typeName));
935936

modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,12 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception {
311311

312312
String getExpr = (F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName) + "()";
313313

314+
if (field.getAnnotation(Order.class).user()) {
315+
returnFalseIfWriteFailed(write, "writer.writeUserObject", getExpr);
316+
317+
return;
318+
}
319+
314320
TypeMirror type = field.asType();
315321

316322
if (type.getKind().isPrimitive()) {
@@ -432,6 +438,12 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception {
432438

433439
String name = F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName;
434440

441+
if (field.getAnnotation(Order.class).user()) {
442+
returnFalseIfReadFailed(name, "reader.readUserObject");
443+
444+
return;
445+
}
446+
435447
if (type.getKind().isPrimitive()) {
436448
String typeName = capitalizeOnlyFirst(type.getKind().name());
437449

modules/codegen2/src/main/java/org/apache/ignite/internal/Order.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,7 @@
4242

4343
/** @return Getter and setter name. */
4444
String method() default "";
45+
46+
/** */
47+
boolean user() default false;
4548
}

modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java

Lines changed: 30 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,8 @@ public class GridJobExecuteResponse implements Message {
6060
@Order(value = 3, method = "exceptionMessage")
6161
private @Nullable ErrorMessage gridExMsg;
6262

63-
/** Job result serialization call holder. */
64-
@Order(value = 4, method = "jobResultBytes")
65-
private @Nullable byte[] resBytes;
66-
6763
/** */
64+
@Order(value = 4, method = "jobResult", user = true)
6865
private @Nullable Object res;
6966

7067
/** */
@@ -152,22 +149,17 @@ public void jobId(IgniteUuid jobId) {
152149
}
153150

154151
/**
155-
* @return Serialized job result.
152+
* @return Job result.
156153
*/
157-
@Nullable public byte[] jobResultBytes() {
158-
return resBytes;
159-
}
160-
161-
/** */
162-
public void jobResultBytes(@Nullable byte[] resBytes) {
163-
this.resBytes = resBytes;
154+
@Nullable public Object jobResult() {
155+
return res;
164156
}
165157

166158
/**
167-
* @return Job result.
159+
* @param res Job result.
168160
*/
169-
@Nullable public Object getJobResult() {
170-
return res;
161+
public void jobResult(Object res) {
162+
this.res = res;
171163
}
172164

173165
/**
@@ -274,25 +266,25 @@ public void retryTopologyVersion(@Nullable AffinityTopologyVersion retry) {
274266

275267
/**
276268
* Serializes user data to byte[] with provided marshaller.
277-
* Erases non-marshalled data like {@link #getJobAttributes()} or {@link #getJobResult()}.
269+
* Erases non-marshalled data like {@link #getJobAttributes()}.
278270
*/
279271
public void marshallUserData(Marshaller marsh, @Nullable IgniteLogger log) {
280-
if (res != null) {
281-
try {
282-
resBytes = U.marshal(marsh, res);
283-
}
284-
catch (IgniteCheckedException e) {
285-
resBytes = null;
286-
287-
String msg = "Failed to serialize job response [nodeId=" + nodeId +
288-
", ses=" + sesId + ", jobId=" + jobId +
289-
", resCls=" + (res == null ? null : res.getClass()) + ']';
290-
291-
wrapSerializationError(e, msg, log);
292-
}
293-
294-
res = null;
295-
}
272+
// if (res != null) {
273+
// try {
274+
// resBytes = U.marshal(marsh, res);
275+
// }
276+
// catch (IgniteCheckedException e) {
277+
// resBytes = null;
278+
//
279+
// String msg = "Failed to serialize job response [nodeId=" + nodeId +
280+
// ", ses=" + sesId + ", jobId=" + jobId +
281+
// ", resCls=" + (res == null ? null : res.getClass()) + ']';
282+
//
283+
// wrapSerializationError(e, msg, log);
284+
// }
285+
//
286+
// res = null;
287+
// }
296288

297289
if (!F.isEmpty(jobAttrs)) {
298290
try {
@@ -314,7 +306,7 @@ public void marshallUserData(Marshaller marsh, @Nullable IgniteLogger log) {
314306

315307
/**
316308
* Deserializes user data from byte[] with provided marshaller and class loader.
317-
* Erases marshalled data like {@link #jobAttrubutesBytes()} or {@link #jobResultBytes()}.
309+
* Erases marshalled data like {@link #jobAttrubutesBytes()}.
318310
*/
319311
public void unmarshallUserData(Marshaller marshaller, ClassLoader clsLdr) throws IgniteCheckedException {
320312
if (jobAttrsBytes != null) {
@@ -323,11 +315,11 @@ public void unmarshallUserData(Marshaller marshaller, ClassLoader clsLdr) throws
323315
jobAttrsBytes = null;
324316
}
325317

326-
if (resBytes != null) {
327-
res = U.unmarshal(marshaller, resBytes, clsLdr);
328-
329-
resBytes = null;
330-
}
318+
// if (resBytes != null) {
319+
// res = U.unmarshal(marshaller, resBytes, clsLdr);
320+
//
321+
// resBytes = null;
322+
// }
331323
}
332324

333325
/** */

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Collection;
2323
import java.util.Map;
2424
import java.util.UUID;
25+
import org.apache.ignite.internal.binary.BinaryMarshaller;
2526
import org.apache.ignite.internal.direct.state.DirectMessageState;
2627
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
2728
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
@@ -58,10 +59,10 @@ public class DirectMessageReader implements MessageReader {
5859
* @param msgFactory Message factory.
5960
* @param cacheObjProc Cache object processor.
6061
*/
61-
public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
62+
public DirectMessageReader(final MessageFactory msgFactory, BinaryMarshaller marsh, IgniteCacheObjectProcessor cacheObjProc) {
6263
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
6364
@Override public StateItem apply() {
64-
return new StateItem(msgFactory, cacheObjProc);
65+
return new StateItem(msgFactory, marsh, cacheObjProc);
6566
}
6667
});
6768
}
@@ -357,6 +358,17 @@ public ByteBuffer getBuffer() {
357358
return ll;
358359
}
359360

361+
/** {@inheritDoc} */
362+
@Override public <T> T readUserObject() {
363+
DirectByteBufferStream stream = state.item().stream;
364+
365+
T userObj = stream.readUserObject(this);
366+
367+
lastRead = stream.lastFinished();
368+
369+
return userObj;
370+
}
371+
360372
/** {@inheritDoc} */
361373
@Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls) {
362374
DirectByteBufferStream stream = state.item().stream;
@@ -441,8 +453,8 @@ private static class StateItem implements DirectMessageStateItem {
441453
* @param msgFactory Message factory.
442454
* @param cacheObjProc Cache object processor.
443455
*/
444-
public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
445-
stream = new DirectByteBufferStream(msgFactory, cacheObjProc);
456+
public StateItem(MessageFactory msgFactory, BinaryMarshaller marsh, IgniteCacheObjectProcessor cacheObjProc) {
457+
stream = new DirectByteBufferStream(msgFactory, marsh, cacheObjProc);
446458
}
447459

448460
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Collection;
2323
import java.util.Map;
2424
import java.util.UUID;
25+
import org.apache.ignite.internal.binary.BinaryMarshaller;
2526
import org.apache.ignite.internal.direct.state.DirectMessageState;
2627
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
2728
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
@@ -51,10 +52,10 @@ public class DirectMessageWriter implements MessageWriter {
5152
private ByteBuffer buf;
5253

5354
/** */
54-
public DirectMessageWriter(final MessageFactory msgFactory) {
55+
public DirectMessageWriter(final MessageFactory msgFactory, final BinaryMarshaller marsh) {
5556
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
5657
@Override public StateItem apply() {
57-
return new StateItem(msgFactory);
58+
return new StateItem(msgFactory, marsh);
5859
}
5960
});
6061
}
@@ -355,6 +356,15 @@ public ByteBuffer getBuffer() {
355356
return stream.lastFinished();
356357
}
357358

359+
/** {@inheritDoc} */
360+
@Override public <T> boolean writeUserObject(T obj) {
361+
DirectByteBufferStream stream = state.item().stream;
362+
363+
stream.writeUserObject(obj);
364+
365+
return stream.lastFinished();
366+
}
367+
358368
/** {@inheritDoc} */
359369
@Override public boolean isHeaderWritten() {
360370
return state.item().hdrWritten;
@@ -410,8 +420,8 @@ private static class StateItem implements DirectMessageStateItem {
410420
private boolean hdrWritten;
411421

412422
/** */
413-
public StateItem(MessageFactory msgFactory) {
414-
stream = new DirectByteBufferStream(msgFactory);
423+
public StateItem(MessageFactory msgFactory, BinaryMarshaller marsh) {
424+
stream = new DirectByteBufferStream(msgFactory, marsh);
415425
}
416426

417427
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.UUID;
3030
import org.apache.ignite.IgniteCheckedException;
3131
import org.apache.ignite.IgniteException;
32+
import org.apache.ignite.internal.binary.BinaryMarshaller;
3233
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
3334
import org.apache.ignite.internal.processors.cache.CacheObject;
3435
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -222,6 +223,9 @@ public class DirectByteBufferStream {
222223
@GridToStringExclude
223224
private final IgniteCacheObjectProcessor cacheObjProc;
224225

226+
/** Binary marshaller for marshalling user objects. */
227+
private final BinaryMarshaller marsh;
228+
225229
/** */
226230
@GridToStringExclude
227231
protected ByteBuffer buf;
@@ -342,8 +346,9 @@ public class DirectByteBufferStream {
342346
*
343347
* @param msgFactory Message factory.
344348
*/
345-
public DirectByteBufferStream(MessageFactory msgFactory) {
349+
public DirectByteBufferStream(MessageFactory msgFactory, BinaryMarshaller marsh) {
346350
this.msgFactory = msgFactory;
351+
this.marsh = marsh;
347352

348353
// Is not used while writing messages.
349354
cacheObjProc = null;
@@ -355,8 +360,9 @@ public DirectByteBufferStream(MessageFactory msgFactory) {
355360
* @param msgFactory Message factory.
356361
* @param cacheObjProc Cache object processor.
357362
*/
358-
public DirectByteBufferStream(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
363+
public DirectByteBufferStream(MessageFactory msgFactory, BinaryMarshaller marsh, IgniteCacheObjectProcessor cacheObjProc) {
359364
this.msgFactory = msgFactory;
365+
this.marsh = marsh;
360366
this.cacheObjProc = cacheObjProc;
361367
}
362368

@@ -897,6 +903,16 @@ public void writeMessage(Message msg, MessageWriter writer) {
897903
writeShort(Short.MIN_VALUE);
898904
}
899905

906+
/** */
907+
public <T> void writeUserObject(T obj) {
908+
try {
909+
writeByteArray(marsh.marshal(obj));
910+
}
911+
catch (IgniteCheckedException e) {
912+
throw new IgniteException(e);
913+
}
914+
}
915+
900916
/**
901917
* @param arr Array.
902918
* @param itemType Component type.
@@ -1557,6 +1573,18 @@ public <T extends Message> T readMessage(MessageReader reader) {
15571573
return null;
15581574
}
15591575

1576+
/** */
1577+
public <T> T readUserObject(MessageReader reader) {
1578+
try {
1579+
byte[] arr = readByteArray();
1580+
1581+
return marsh.unmarshal(arr, null);
1582+
}
1583+
catch (IgniteCheckedException e) {
1584+
throw new IgniteException(e);
1585+
}
1586+
}
1587+
15601588
/**
15611589
* @param itemType Item type.
15621590
* @param itemCls Item class.

modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.ignite.internal.GridComponent;
3636
import org.apache.ignite.internal.GridKernalContext;
3737
import org.apache.ignite.internal.IgniteInternalFuture;
38+
import org.apache.ignite.internal.binary.BinaryMarshaller;
3839
import org.apache.ignite.internal.managers.communication.GridMessageListener;
3940
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
4041
import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
@@ -562,6 +563,10 @@ protected final String stopInfo() {
562563
return ctx.io().formatter();
563564
}
564565

566+
@Override public BinaryMarshaller binaryMarshaller() {
567+
return ctx.marshaller();
568+
}
569+
565570
@Override public MessageFactory messageFactory() {
566571
return ctx.io().messageFactory();
567572
}

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,11 +433,11 @@ public void resetMetrics() {
433433
else {
434434
formatter = new MessageFormatter() {
435435
@Override public MessageWriter writer(MessageFactory msgFactory) {
436-
return new DirectMessageWriter(msgFactory);
436+
return new DirectMessageWriter(msgFactory, ctx.marshaller());
437437
}
438438

439439
@Override public MessageReader reader(MessageFactory msgFactory) {
440-
return new DirectMessageReader(msgFactory, ctx.cacheObjects());
440+
return new DirectMessageReader(msgFactory, ctx.marshaller(), ctx.cacheObjects());
441441
}
442442
};
443443
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.function.Consumer;
2525
import org.apache.ignite.cluster.ClusterNode;
2626
import org.apache.ignite.events.Event;
27+
import org.apache.ignite.internal.binary.BinaryMarshaller;
2728
import org.apache.ignite.internal.managers.communication.GridMessageListener;
2829
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
2930
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -182,6 +183,11 @@ public class StandaloneSpiContext implements IgniteSpiContext {
182183
return null;
183184
}
184185

186+
/** {@inheritDoc} */
187+
@Override public BinaryMarshaller binaryMarshaller() {
188+
return null;
189+
}
190+
185191
/** {@inheritDoc} */
186192
@Override public MessageFactory messageFactory() {
187193
return null;

0 commit comments

Comments
 (0)