Skip to content

Commit 06301d5

Browse files
authored
IGNITE-26813 Refactor QueryStartResponse, FragmentMapping, ColocationGroup (#12454)
1 parent 7bc011c commit 06301d5

File tree

7 files changed

+151
-334
lines changed

7 files changed

+151
-334
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ private void executeFragment(Query<Row> qry, FragmentPlan plan, ExecutionContext
854854

855855
if (!qry.isExchangeWithInitNodeStarted(ectx.fragmentId())) {
856856
try {
857-
messageService().send(origNodeId, new QueryStartResponse(qry.id(), ectx.fragmentId()));
857+
messageService().send(origNodeId, new QueryStartResponse(qry.id(), ectx.fragmentId(), null));
858858
}
859859
catch (IgniteCheckedException e) {
860860
IgniteException wrpEx = new IgniteException("Failed to send reply. [nodeId=" + origNodeId + ']', e);

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

2020
import java.util.function.Supplier;
21+
import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
2122
import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
23+
import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
2224
import org.apache.ignite.internal.codegen.InboxCloseMessageSerializer;
2325
import org.apache.ignite.internal.codegen.QueryBatchAcknowledgeMessageSerializer;
2426
import org.apache.ignite.internal.codegen.QueryCloseMessageSerializer;
27+
import org.apache.ignite.internal.codegen.QueryStartResponseSerializer;
2528
import org.apache.ignite.internal.codegen.QueryTxEntrySerializer;
2629
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
2730
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
@@ -36,7 +39,7 @@ public enum MessageType {
3639
QUERY_START_REQUEST(300, QueryStartRequest::new),
3740

3841
/** */
39-
QUERY_START_RESPONSE(301, QueryStartResponse::new),
42+
QUERY_START_RESPONSE(301, QueryStartResponse::new, new QueryStartResponseSerializer()),
4043

4144
/** */
4245
QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
@@ -57,10 +60,10 @@ public enum MessageType {
5760
GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
5861

5962
/** */
60-
FRAGMENT_MAPPING(350, FragmentMapping::new),
63+
FRAGMENT_MAPPING(350, FragmentMapping::new, new FragmentMappingSerializer()),
6164

6265
/** */
63-
COLOCATION_GROUP(351, ColocationGroup::new),
66+
COLOCATION_GROUP(351, ColocationGroup::new, new ColocationGroupSerializer()),
6467

6568
/** */
6669
FRAGMENT_DESCRIPTION(352, FragmentDescription::new, new FragmentDescriptionSerializer()),

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java

Lines changed: 33 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,49 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.UUID;
22-
import org.apache.ignite.IgniteCheckedException;
23-
import org.apache.ignite.internal.GridDirectTransient;
24-
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
25-
import org.apache.ignite.internal.util.typedef.internal.U;
26-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
27-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.internal.managers.communication.ErrorMessage;
23+
import org.jetbrains.annotations.Nullable;
2824

2925
/**
3026
*
3127
*/
32-
public class QueryStartResponse implements MarshalableMessage {
28+
public class QueryStartResponse implements CalciteMessage {
3329
/** */
34-
private UUID queryId;
30+
@Order(value = 0, method = "queryId")
31+
private UUID qryId;
3532

3633
/** */
34+
@Order(1)
3735
private long fragmentId;
3836

3937
/** */
40-
@GridDirectTransient
41-
private Throwable error;
42-
43-
/** */
44-
private byte[] errBytes;
38+
@Order(value = 2, method = "errorMessage")
39+
private @Nullable ErrorMessage errMsg;
4540

4641
/** */
4742
public QueryStartResponse() {}
4843

4944
/** */
50-
public QueryStartResponse(UUID queryId, long fragmentId) {
51-
this(queryId, fragmentId, null);
52-
}
53-
54-
/** */
55-
public QueryStartResponse(UUID queryId, long fragmentId, Throwable error) {
56-
this.queryId = queryId;
45+
public QueryStartResponse(UUID qryId, long fragmentId, @Nullable Throwable error) {
46+
this.qryId = qryId;
5747
this.fragmentId = fragmentId;
58-
this.error = error;
48+
49+
if (error != null)
50+
errMsg = new ErrorMessage(error);
5951
}
6052

6153
/**
6254
* @return Query ID.
6355
*/
6456
public UUID queryId() {
65-
return queryId;
57+
return qryId;
58+
}
59+
60+
/** */
61+
public void queryId(UUID qryId) {
62+
this.qryId = qryId;
6663
}
6764

6865
/**
@@ -72,92 +69,26 @@ public long fragmentId() {
7269
return fragmentId;
7370
}
7471

72+
/** */
73+
public void fragmentId(long fragmentId) {
74+
this.fragmentId = fragmentId;
75+
}
76+
7577
/**
7678
* @return Error.
7779
*/
78-
public Throwable error() {
79-
return error;
80-
}
81-
82-
/** {@inheritDoc} */
83-
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
84-
if (error != null)
85-
errBytes = U.marshal(ctx, error);
80+
public @Nullable Throwable error() {
81+
return ErrorMessage.error(errMsg);
8682
}
8783

88-
/** {@inheritDoc} */
89-
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
90-
if (errBytes != null)
91-
error = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ctx.gridConfig()));
92-
}
93-
94-
/** {@inheritDoc} */
95-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
96-
writer.setBuffer(buf);
97-
98-
if (!writer.isHeaderWritten()) {
99-
if (!writer.writeHeader(directType()))
100-
return false;
101-
102-
writer.onHeaderWritten();
103-
}
104-
105-
switch (writer.state()) {
106-
case 0:
107-
if (!writer.writeByteArray(errBytes))
108-
return false;
109-
110-
writer.incrementState();
111-
112-
case 1:
113-
if (!writer.writeLong(fragmentId))
114-
return false;
115-
116-
writer.incrementState();
117-
118-
case 2:
119-
if (!writer.writeUuid(queryId))
120-
return false;
121-
122-
writer.incrementState();
123-
124-
}
125-
126-
return true;
84+
/** */
85+
public @Nullable ErrorMessage errorMessage() {
86+
return errMsg;
12787
}
12888

129-
/** {@inheritDoc} */
130-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
131-
reader.setBuffer(buf);
132-
133-
switch (reader.state()) {
134-
case 0:
135-
errBytes = reader.readByteArray();
136-
137-
if (!reader.isLastRead())
138-
return false;
139-
140-
reader.incrementState();
141-
142-
case 1:
143-
fragmentId = reader.readLong();
144-
145-
if (!reader.isLastRead())
146-
return false;
147-
148-
reader.incrementState();
149-
150-
case 2:
151-
queryId = reader.readUuid();
152-
153-
if (!reader.isLastRead())
154-
return false;
155-
156-
reader.incrementState();
157-
158-
}
159-
160-
return true;
89+
/** */
90+
public void errorMessage(@Nullable ErrorMessage errMsg) {
91+
this.errMsg = errMsg;
16192
}
16293

16394
/** {@inheritDoc} */

0 commit comments

Comments
 (0)