Skip to content

Commit 36cbb27

Browse files
authored
IGNITE-26956 Use MessageSerializer for GenericValueMessage and QueryBatchMessage (#12500)
1 parent b090ee5 commit 36cbb27

File tree

3 files changed

+77
-169
lines changed

3 files changed

+77
-169
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java

Lines changed: 17 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,20 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

20-
import java.nio.ByteBuffer;
2120
import org.apache.ignite.IgniteCheckedException;
22-
import org.apache.ignite.internal.GridDirectTransient;
21+
import org.apache.ignite.internal.Order;
2322
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
2423
import org.apache.ignite.internal.util.typedef.internal.U;
25-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
26-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
2724

2825
/**
2926
*
3027
*/
3128
public final class GenericValueMessage implements ValueMessage {
3229
/** */
33-
@GridDirectTransient
3430
private Object val;
3531

3632
/** */
33+
@Order(0)
3734
private byte[] serialized;
3835

3936
/** */
@@ -51,6 +48,20 @@ public GenericValueMessage(Object val) {
5148
return val;
5249
}
5350

51+
/**
52+
* @return Serialized value.
53+
*/
54+
public byte[] serialized() {
55+
return serialized;
56+
}
57+
58+
/**
59+
* @param serialized Serialized value.
60+
*/
61+
public void serialized(byte[] serialized) {
62+
this.serialized = serialized;
63+
}
64+
5465
/** {@inheritDoc} */
5566
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
5667
if (val != null && serialized == null)
@@ -59,50 +70,8 @@ public GenericValueMessage(Object val) {
5970

6071
/** {@inheritDoc} */
6172
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
62-
if (serialized != null && val == null) {
73+
if (serialized != null && val == null)
6374
val = U.unmarshal(ctx, serialized, U.resolveClassLoader(ctx.gridConfig()));
64-
}
65-
}
66-
67-
/** {@inheritDoc} */
68-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
69-
writer.setBuffer(buf);
70-
71-
if (!writer.isHeaderWritten()) {
72-
if (!writer.writeHeader(directType()))
73-
return false;
74-
75-
writer.onHeaderWritten();
76-
}
77-
78-
switch (writer.state()) {
79-
case 0:
80-
if (!writer.writeByteArray(serialized))
81-
return false;
82-
83-
writer.incrementState();
84-
85-
}
86-
87-
return true;
88-
}
89-
90-
/** {@inheritDoc} */
91-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
92-
reader.setBuffer(buf);
93-
94-
switch (reader.state()) {
95-
case 0:
96-
serialized = reader.readByteArray();
97-
98-
if (!reader.isLastRead())
99-
return false;
100-
101-
reader.incrementState();
102-
103-
}
104-
105-
return true;
10675
}
10776

10877
/** {@inheritDoc} */

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
2323
import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
2424
import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
25+
import org.apache.ignite.internal.codegen.GenericValueMessageSerializer;
2526
import org.apache.ignite.internal.codegen.InboxCloseMessageSerializer;
2627
import org.apache.ignite.internal.codegen.QueryBatchAcknowledgeMessageSerializer;
28+
import org.apache.ignite.internal.codegen.QueryBatchMessageSerializer;
2729
import org.apache.ignite.internal.codegen.QueryCloseMessageSerializer;
2830
import org.apache.ignite.internal.codegen.QueryStartResponseSerializer;
2931
import org.apache.ignite.internal.codegen.QueryTxEntrySerializer;
@@ -46,7 +48,7 @@ public enum MessageType {
4648
QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new CalciteErrorMessageSerializer()),
4749

4850
/** */
49-
QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new),
51+
QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new, new QueryBatchMessageSerializer()),
5052

5153
/** */
5254
QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new, new QueryBatchAcknowledgeMessageSerializer()),
@@ -58,7 +60,7 @@ public enum MessageType {
5860
QUERY_CLOSE_MESSAGE(306, QueryCloseMessage::new, new QueryCloseMessageSerializer()),
5961

6062
/** */
61-
GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
63+
GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new, new GenericValueMessageSerializer()),
6264

6365
/** */
6466
FRAGMENT_MAPPING(350, FragmentMapping::new, new FragmentMappingSerializer()),

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java

Lines changed: 56 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -17,43 +17,42 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.UUID;
2423
import org.apache.ignite.IgniteCheckedException;
25-
import org.apache.ignite.internal.GridDirectCollection;
26-
import org.apache.ignite.internal.GridDirectTransient;
24+
import org.apache.ignite.internal.Order;
2725
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
28-
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
29-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
30-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
3126

3227
/**
3328
*
3429
*/
3530
public class QueryBatchMessage implements MarshalableMessage, ExecutionContextAware {
3631
/** */
32+
@Order(value = 0, method = "queryId")
3733
private UUID qryId;
3834

3935
/** */
36+
@Order(1)
4037
private long fragmentId;
4138

4239
/** */
40+
@Order(2)
4341
private long exchangeId;
4442

4543
/** */
44+
@Order(3)
4645
private int batchId;
4746

4847
/** */
48+
@Order(4)
4949
private boolean last;
5050

5151
/** */
52-
@GridDirectTransient
5352
private List<Object> rows;
5453

5554
/** */
56-
@GridDirectCollection(ValueMessage.class)
55+
@Order(value = 5, method = "messageRows")
5756
private List<ValueMessage> mRows;
5857

5958
/** */
@@ -75,39 +74,88 @@ public QueryBatchMessage(UUID qryId, long fragmentId, long exchangeId, int batch
7574
return qryId;
7675
}
7776

77+
/**
78+
* @param qryId Query ID.
79+
*/
80+
public void queryId(UUID qryId) {
81+
this.qryId = qryId;
82+
}
83+
7884
/** {@inheritDoc} */
7985
@Override public long fragmentId() {
8086
return fragmentId;
8187
}
8288

89+
/**
90+
* @param fragmentId Fragment ID.
91+
*/
92+
public void fragmentId(long fragmentId) {
93+
this.fragmentId = fragmentId;
94+
}
95+
8396
/**
8497
* @return Exchange ID.
8598
*/
8699
public long exchangeId() {
87100
return exchangeId;
88101
}
89102

103+
/**
104+
* @param exchangeId Exchange ID.
105+
*/
106+
public void exchangeId(long exchangeId) {
107+
this.exchangeId = exchangeId;
108+
}
109+
90110
/**
91111
* @return Batch ID.
92112
*/
93113
public int batchId() {
94114
return batchId;
95115
}
96116

117+
/**
118+
* @param batchId Batch ID.
119+
*/
120+
public void batchId(int batchId) {
121+
this.batchId = batchId;
122+
}
123+
97124
/**
98125
* @return Last batch flag.
99126
*/
100127
public boolean last() {
101128
return last;
102129
}
103130

131+
/**
132+
* @param last Last batch flag.
133+
*/
134+
public void last(boolean last) {
135+
this.last = last;
136+
}
137+
104138
/**
105139
* @return Rows.
106140
*/
107141
public List<Object> rows() {
108142
return rows;
109143
}
110144

145+
/**
146+
* @return Message rows.
147+
*/
148+
public List<ValueMessage> messageRows() {
149+
return mRows;
150+
}
151+
152+
/**
153+
* @param mRows Message rows.
154+
*/
155+
public void messageRows(List<ValueMessage> mRows) {
156+
this.mRows = mRows;
157+
}
158+
111159
/** {@inheritDoc} */
112160
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
113161
if (mRows != null || rows == null)
@@ -142,117 +190,6 @@ public List<Object> rows() {
142190
}
143191
}
144192

145-
/** {@inheritDoc} */
146-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
147-
writer.setBuffer(buf);
148-
149-
if (!writer.isHeaderWritten()) {
150-
if (!writer.writeHeader(directType()))
151-
return false;
152-
153-
writer.onHeaderWritten();
154-
}
155-
156-
switch (writer.state()) {
157-
case 0:
158-
if (!writer.writeInt(batchId))
159-
return false;
160-
161-
writer.incrementState();
162-
163-
case 1:
164-
if (!writer.writeLong(exchangeId))
165-
return false;
166-
167-
writer.incrementState();
168-
169-
case 2:
170-
if (!writer.writeLong(fragmentId))
171-
return false;
172-
173-
writer.incrementState();
174-
175-
case 3:
176-
if (!writer.writeBoolean(last))
177-
return false;
178-
179-
writer.incrementState();
180-
181-
case 4:
182-
if (!writer.writeCollection(mRows, MessageCollectionItemType.MSG))
183-
return false;
184-
185-
writer.incrementState();
186-
187-
case 5:
188-
if (!writer.writeUuid(qryId))
189-
return false;
190-
191-
writer.incrementState();
192-
193-
}
194-
195-
return true;
196-
}
197-
198-
/** {@inheritDoc} */
199-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
200-
reader.setBuffer(buf);
201-
202-
switch (reader.state()) {
203-
case 0:
204-
batchId = reader.readInt();
205-
206-
if (!reader.isLastRead())
207-
return false;
208-
209-
reader.incrementState();
210-
211-
case 1:
212-
exchangeId = reader.readLong();
213-
214-
if (!reader.isLastRead())
215-
return false;
216-
217-
reader.incrementState();
218-
219-
case 2:
220-
fragmentId = reader.readLong();
221-
222-
if (!reader.isLastRead())
223-
return false;
224-
225-
reader.incrementState();
226-
227-
case 3:
228-
last = reader.readBoolean();
229-
230-
if (!reader.isLastRead())
231-
return false;
232-
233-
reader.incrementState();
234-
235-
case 4:
236-
mRows = reader.readCollection(MessageCollectionItemType.MSG);
237-
238-
if (!reader.isLastRead())
239-
return false;
240-
241-
reader.incrementState();
242-
243-
case 5:
244-
qryId = reader.readUuid();
245-
246-
if (!reader.isLastRead())
247-
return false;
248-
249-
reader.incrementState();
250-
251-
}
252-
253-
return true;
254-
}
255-
256193
/** {@inheritDoc} */
257194
@Override public MessageType type() {
258195
return MessageType.QUERY_BATCH_MESSAGE;

0 commit comments

Comments
 (0)