Skip to content

Commit 562932c

Browse files
authored
IGNITE-26829 Use MessageSerializer for GridTaskResultResponse (#12466)
1 parent e79b395 commit 562932c

File tree

3 files changed

+35
-99
lines changed

3 files changed

+35
-99
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.apache.ignite.internal.codegen.GridQueryNextPageResponseSerializer;
100100
import org.apache.ignite.internal.codegen.GridTaskCancelRequestSerializer;
101101
import org.apache.ignite.internal.codegen.GridTaskResultRequestSerializer;
102+
import org.apache.ignite.internal.codegen.GridTaskResultResponseSerializer;
102103
import org.apache.ignite.internal.codegen.HandshakeMessageSerializer;
103104
import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
104105
import org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
@@ -355,7 +356,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
355356
factory.register((short)62, DataStreamerRequest::new);
356357
factory.register((short)63, DataStreamerResponse::new);
357358
factory.register((short)76, GridTaskResultRequest::new, new GridTaskResultRequestSerializer());
358-
factory.register((short)77, GridTaskResultResponse::new);
359+
factory.register((short)77, GridTaskResultResponse::new, new GridTaskResultResponseSerializer());
359360
factory.register((short)78, MissingMappingRequestMessage::new, new MissingMappingRequestMessageSerializer());
360361
factory.register((short)79, MissingMappingResponseMessage::new, new MissingMappingResponseMessageSerializer());
361362
factory.register((short)80, MetadataRequestMessage::new, new MetadataRequestMessageSerializer());

modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,8 @@ public GridTaskCommandHandler(final GridKernalContext ctx) {
132132

133133
if (err != null)
134134
res.error(err.getMessage());
135-
else {
136-
res.result(desc.result());
137-
res.resultBytes(U.marshal(ctx, desc.result()));
138-
}
135+
else
136+
res.marshalResult(ctx, desc.result());
139137
}
140138
else
141139
res.found(false);
@@ -433,7 +431,7 @@ else if (!nodeId.equals(resHolderId))
433431
res = (GridTaskResultResponse)msg;
434432

435433
try {
436-
res.result(U.unmarshal(ctx, res.resultBytes(), U.resolveClassLoader(ctx.config())));
434+
res.unmarshalResult(ctx);
437435
}
438436
catch (IgniteCheckedException e) {
439437
U.error(log, "Failed to unmarshal task result: " + res, e);

modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java

Lines changed: 30 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,34 @@
1717

1818
package org.apache.ignite.internal.processors.rest.handlers.task;
1919

20-
import java.nio.ByteBuffer;
21-
import org.apache.ignite.internal.GridDirectTransient;
20+
import org.apache.ignite.IgniteCheckedException;
21+
import org.apache.ignite.internal.GridKernalContext;
22+
import org.apache.ignite.internal.Order;
23+
import org.apache.ignite.internal.util.typedef.internal.U;
2224
import org.apache.ignite.plugin.extensions.communication.Message;
23-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
24-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
2525
import org.jetbrains.annotations.Nullable;
2626

2727
/**
2828
* Task result response.
2929
*/
3030
public class GridTaskResultResponse implements Message {
3131
/** Result. */
32-
@GridDirectTransient
33-
private Object res;
32+
private @Nullable Object res;
3433

3534
/** Serialized result. */
35+
@Order(value = 0, method = "resultBytes")
3636
private byte[] resBytes;
3737

3838
/** Finished flag. */
39+
@Order(1)
3940
private boolean finished;
4041

4142
/** Flag indicating that task has ever been launched on node. */
43+
@Order(2)
4244
private boolean found;
4345

4446
/** Error. */
47+
@Order(value = 3, method = "error")
4548
private String err;
4649

4750
/**
@@ -51,13 +54,6 @@ public class GridTaskResultResponse implements Message {
5154
return res;
5255
}
5356

54-
/**
55-
* @param res Task result.
56-
*/
57-
public void result(@Nullable Object res) {
58-
this.res = res;
59-
}
60-
6157
/**
6258
* @param resBytes Serialized result.
6359
*/
@@ -115,90 +111,31 @@ public void error(String err) {
115111
}
116112

117113
/** {@inheritDoc} */
118-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
119-
writer.setBuffer(buf);
120-
121-
if (!writer.isHeaderWritten()) {
122-
if (!writer.writeHeader(directType()))
123-
return false;
124-
125-
writer.onHeaderWritten();
126-
}
127-
128-
switch (writer.state()) {
129-
case 0:
130-
if (!writer.writeString(err))
131-
return false;
132-
133-
writer.incrementState();
134-
135-
case 1:
136-
if (!writer.writeBoolean(finished))
137-
return false;
138-
139-
writer.incrementState();
140-
141-
case 2:
142-
if (!writer.writeBoolean(found))
143-
return false;
144-
145-
writer.incrementState();
146-
147-
case 3:
148-
if (!writer.writeByteArray(resBytes))
149-
return false;
150-
151-
writer.incrementState();
152-
153-
}
154-
155-
return true;
114+
@Override public short directType() {
115+
return 77;
156116
}
157117

158-
/** {@inheritDoc} */
159-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
160-
reader.setBuffer(buf);
161-
162-
switch (reader.state()) {
163-
case 0:
164-
err = reader.readString();
165-
166-
if (!reader.isLastRead())
167-
return false;
168-
169-
reader.incrementState();
170-
171-
case 1:
172-
finished = reader.readBoolean();
173-
174-
if (!reader.isLastRead())
175-
return false;
176-
177-
reader.incrementState();
178-
179-
case 2:
180-
found = reader.readBoolean();
181-
182-
if (!reader.isLastRead())
183-
return false;
184-
185-
reader.incrementState();
186-
187-
case 3:
188-
resBytes = reader.readByteArray();
189-
190-
if (!reader.isLastRead())
191-
return false;
118+
/**
119+
* Marshals task result to byte array.
120+
*
121+
* @param ctx Context.
122+
* @param res Task result.
123+
*/
124+
public void marshalResult(GridKernalContext ctx, @Nullable Object res) throws IgniteCheckedException {
125+
resBytes = U.marshal(ctx, res);
126+
}
192127

193-
reader.incrementState();
128+
/**
129+
* Unmarshals task result from byte array.
130+
*
131+
* @param ctx Context.
132+
*/
133+
public void unmarshalResult(GridKernalContext ctx) throws IgniteCheckedException {
134+
if (resBytes != null) {
135+
res = U.unmarshal(ctx, resBytes, U.resolveClassLoader(ctx.config()));
194136

137+
// It is not required anymore.
138+
resBytes = null;
195139
}
196-
197-
return true;
198-
}
199-
200-
/** {@inheritDoc} */
201-
@Override public short directType() {
202-
return 77;
203140
}
204141
}

0 commit comments

Comments
 (0)