Skip to content

Commit 016f4a3

Browse files
committed
Merge branch 'master' of github.com:apache/ignite into IGNITE-26111__discovery_serialization
2 parents bf87669 + a431294 commit 016f4a3

File tree

78 files changed

+1873
-768
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+1873
-768
lines changed

modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,6 @@ private static AffinityFunctionContext context(
328328
private static ClusterNode node(int idx) {
329329
TcpDiscoveryNode node = new TcpDiscoveryNode(
330330
UUID.randomUUID(),
331-
null,
332331
Collections.singletonList("127.0.0.1"),
333332
Collections.singletonList("127.0.0.1"),
334333
0,

modules/binary/api/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -323,12 +323,7 @@ private <T> T uncachedValue(Class<?> cls) throws BinaryObjectException {
323323

324324
/** {@inheritDoc} */
325325
@Override public byte[] valueBytes(CacheObjectValueContext cacheCtx) throws IgniteCheckedException {
326-
if (valBytes != null)
327-
return valBytes;
328-
329-
valBytes = Marshallers.marshal(ctx.marshaller(), this);
330-
331-
return valBytes;
326+
return valueBytes();
332327
}
333328

334329
/** {@inheritDoc} */
@@ -382,15 +377,21 @@ private <T> T uncachedValue(Class<?> cls) throws BinaryObjectException {
382377

383378
/** {@inheritDoc} */
384379
@Override public int size() {
385-
if (valBytes == null) {
386-
try {
387-
valBytes = Marshallers.marshal(ctx.marshaller(), this);
388-
}
389-
catch (IgniteCheckedException e) {
390-
throw CommonUtils.convertException(e);
391-
}
380+
try {
381+
return valueBytes().length;
382+
}
383+
catch (IgniteCheckedException e) {
384+
throw CommonUtils.convertException(e);
392385
}
386+
}
387+
388+
/** */
389+
private byte[] valueBytes() throws IgniteCheckedException {
390+
if (valBytes != null)
391+
return valBytes;
392+
393+
valBytes = Marshallers.marshal(ctx.marshaller(), this);
393394

394-
return BinaryPrimitives.readInt(valBytes, ord + GridBinaryMarshaller.TOTAL_LEN_POS);
395+
return valBytes;
395396
}
396397
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ private void executeFragment(Query<Row> qry, FragmentPlan plan, ExecutionContext
854854

855855
if (!qry.isExchangeWithInitNodeStarted(ectx.fragmentId())) {
856856
try {
857-
messageService().send(origNodeId, new QueryStartResponse(qry.id(), ectx.fragmentId()));
857+
messageService().send(origNodeId, new QueryStartResponse(qry.id(), ectx.fragmentId(), null));
858858
}
859859
catch (IgniteCheckedException e) {
860860
IgniteException wrpEx = new IgniteException("Failed to send reply. [nodeId=" + origNodeId + ']', e);

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

2020
import java.util.function.Supplier;
21+
import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
2122
import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
23+
import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
2224
import org.apache.ignite.internal.codegen.InboxCloseMessageSerializer;
2325
import org.apache.ignite.internal.codegen.QueryBatchAcknowledgeMessageSerializer;
2426
import org.apache.ignite.internal.codegen.QueryCloseMessageSerializer;
27+
import org.apache.ignite.internal.codegen.QueryStartResponseSerializer;
2528
import org.apache.ignite.internal.codegen.QueryTxEntrySerializer;
2629
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
2730
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
@@ -36,7 +39,7 @@ public enum MessageType {
3639
QUERY_START_REQUEST(300, QueryStartRequest::new),
3740

3841
/** */
39-
QUERY_START_RESPONSE(301, QueryStartResponse::new),
42+
QUERY_START_RESPONSE(301, QueryStartResponse::new, new QueryStartResponseSerializer()),
4043

4144
/** */
4245
QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
@@ -57,10 +60,10 @@ public enum MessageType {
5760
GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
5861

5962
/** */
60-
FRAGMENT_MAPPING(350, FragmentMapping::new),
63+
FRAGMENT_MAPPING(350, FragmentMapping::new, new FragmentMappingSerializer()),
6164

6265
/** */
63-
COLOCATION_GROUP(351, ColocationGroup::new),
66+
COLOCATION_GROUP(351, ColocationGroup::new, new ColocationGroupSerializer()),
6467

6568
/** */
6669
FRAGMENT_DESCRIPTION(352, FragmentDescription::new, new FragmentDescriptionSerializer()),

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java

Lines changed: 33 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,49 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.UUID;
22-
import org.apache.ignite.IgniteCheckedException;
23-
import org.apache.ignite.internal.GridDirectTransient;
24-
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
25-
import org.apache.ignite.internal.util.typedef.internal.U;
26-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
27-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.internal.managers.communication.ErrorMessage;
23+
import org.jetbrains.annotations.Nullable;
2824

2925
/**
3026
*
3127
*/
32-
public class QueryStartResponse implements MarshalableMessage {
28+
public class QueryStartResponse implements CalciteMessage {
3329
/** */
34-
private UUID queryId;
30+
@Order(value = 0, method = "queryId")
31+
private UUID qryId;
3532

3633
/** */
34+
@Order(1)
3735
private long fragmentId;
3836

3937
/** */
40-
@GridDirectTransient
41-
private Throwable error;
42-
43-
/** */
44-
private byte[] errBytes;
38+
@Order(value = 2, method = "errorMessage")
39+
private @Nullable ErrorMessage errMsg;
4540

4641
/** */
4742
public QueryStartResponse() {}
4843

4944
/** */
50-
public QueryStartResponse(UUID queryId, long fragmentId) {
51-
this(queryId, fragmentId, null);
52-
}
53-
54-
/** */
55-
public QueryStartResponse(UUID queryId, long fragmentId, Throwable error) {
56-
this.queryId = queryId;
45+
public QueryStartResponse(UUID qryId, long fragmentId, @Nullable Throwable error) {
46+
this.qryId = qryId;
5747
this.fragmentId = fragmentId;
58-
this.error = error;
48+
49+
if (error != null)
50+
errMsg = new ErrorMessage(error);
5951
}
6052

6153
/**
6254
* @return Query ID.
6355
*/
6456
public UUID queryId() {
65-
return queryId;
57+
return qryId;
58+
}
59+
60+
/** */
61+
public void queryId(UUID qryId) {
62+
this.qryId = qryId;
6663
}
6764

6865
/**
@@ -72,92 +69,26 @@ public long fragmentId() {
7269
return fragmentId;
7370
}
7471

72+
/** */
73+
public void fragmentId(long fragmentId) {
74+
this.fragmentId = fragmentId;
75+
}
76+
7577
/**
7678
* @return Error.
7779
*/
78-
public Throwable error() {
79-
return error;
80-
}
81-
82-
/** {@inheritDoc} */
83-
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
84-
if (error != null)
85-
errBytes = U.marshal(ctx, error);
80+
public @Nullable Throwable error() {
81+
return ErrorMessage.error(errMsg);
8682
}
8783

88-
/** {@inheritDoc} */
89-
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
90-
if (errBytes != null)
91-
error = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ctx.gridConfig()));
92-
}
93-
94-
/** {@inheritDoc} */
95-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
96-
writer.setBuffer(buf);
97-
98-
if (!writer.isHeaderWritten()) {
99-
if (!writer.writeHeader(directType()))
100-
return false;
101-
102-
writer.onHeaderWritten();
103-
}
104-
105-
switch (writer.state()) {
106-
case 0:
107-
if (!writer.writeByteArray(errBytes))
108-
return false;
109-
110-
writer.incrementState();
111-
112-
case 1:
113-
if (!writer.writeLong(fragmentId))
114-
return false;
115-
116-
writer.incrementState();
117-
118-
case 2:
119-
if (!writer.writeUuid(queryId))
120-
return false;
121-
122-
writer.incrementState();
123-
124-
}
125-
126-
return true;
84+
/** */
85+
public @Nullable ErrorMessage errorMessage() {
86+
return errMsg;
12787
}
12888

129-
/** {@inheritDoc} */
130-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
131-
reader.setBuffer(buf);
132-
133-
switch (reader.state()) {
134-
case 0:
135-
errBytes = reader.readByteArray();
136-
137-
if (!reader.isLastRead())
138-
return false;
139-
140-
reader.incrementState();
141-
142-
case 1:
143-
fragmentId = reader.readLong();
144-
145-
if (!reader.isLastRead())
146-
return false;
147-
148-
reader.incrementState();
149-
150-
case 2:
151-
queryId = reader.readUuid();
152-
153-
if (!reader.isLastRead())
154-
return false;
155-
156-
reader.incrementState();
157-
158-
}
159-
160-
return true;
89+
/** */
90+
public void errorMessage(@Nullable ErrorMessage errMsg) {
91+
this.errMsg = errMsg;
16192
}
16293

16394
/** {@inheritDoc} */

0 commit comments

Comments
 (0)