Skip to content

Commit b183a59

Browse files
authored
IGNITE-26947 Use MessageSerializer for GridJobSiblingsResponse (#12499)
1 parent 2017e43 commit b183a59

File tree

3 files changed

+40
-55
lines changed

3 files changed

+40
-55
lines changed

modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,24 @@
1717

1818
package org.apache.ignite.internal;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.Collection;
2221
import org.apache.ignite.IgniteCheckedException;
2322
import org.apache.ignite.compute.ComputeJobSibling;
2423
import org.apache.ignite.internal.util.typedef.internal.S;
2524
import org.apache.ignite.internal.util.typedef.internal.U;
2625
import org.apache.ignite.marshaller.Marshaller;
2726
import org.apache.ignite.plugin.extensions.communication.Message;
28-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
29-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
3027
import org.jetbrains.annotations.Nullable;
3128

3229
/**
3330
* Job siblings response.
3431
*/
3532
public class GridJobSiblingsResponse implements Message {
3633
/** */
37-
@GridDirectTransient
38-
private Collection<ComputeJobSibling> siblings;
34+
private @Nullable Collection<ComputeJobSibling> siblings;
3935

4036
/** */
37+
@Order(0)
4138
private byte[] siblingsBytes;
4239

4340
/**
@@ -49,70 +46,56 @@ public GridJobSiblingsResponse() {
4946

5047
/**
5148
* @param siblings Siblings.
52-
* @param siblingsBytes Serialized siblings.
5349
*/
54-
public GridJobSiblingsResponse(@Nullable Collection<ComputeJobSibling> siblings, @Nullable byte[] siblingsBytes) {
50+
public GridJobSiblingsResponse(@Nullable Collection<ComputeJobSibling> siblings) {
5551
this.siblings = siblings;
56-
this.siblingsBytes = siblingsBytes;
5752
}
5853

5954
/**
6055
* @return Job siblings.
6156
*/
62-
public Collection<ComputeJobSibling> jobSiblings() {
57+
public @Nullable Collection<ComputeJobSibling> jobSiblings() {
6358
return siblings;
6459
}
6560

6661
/**
67-
* @param marsh Marshaller.
68-
* @throws IgniteCheckedException In case of error.
62+
* @return Serialized siblings.
6963
*/
70-
public void unmarshalSiblings(Marshaller marsh) throws IgniteCheckedException {
71-
assert marsh != null;
72-
73-
if (siblingsBytes != null)
74-
siblings = U.unmarshal(marsh, siblingsBytes, null);
64+
public byte[] siblingsBytes() {
65+
return siblingsBytes;
7566
}
7667

77-
/** {@inheritDoc} */
78-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
79-
writer.setBuffer(buf);
80-
81-
if (!writer.isHeaderWritten()) {
82-
if (!writer.writeHeader(directType()))
83-
return false;
84-
85-
writer.onHeaderWritten();
86-
}
87-
88-
switch (writer.state()) {
89-
case 0:
90-
if (!writer.writeByteArray(siblingsBytes))
91-
return false;
92-
93-
writer.incrementState();
94-
95-
}
96-
97-
return true;
68+
/**
69+
* @param siblingsBytes Serialized siblings.
70+
*/
71+
public void siblingsBytes(byte[] siblingsBytes) {
72+
this.siblingsBytes = siblingsBytes;
9873
}
9974

100-
/** {@inheritDoc} */
101-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
102-
reader.setBuffer(buf);
103-
104-
switch (reader.state()) {
105-
case 0:
106-
siblingsBytes = reader.readByteArray();
75+
/**
76+
* Marshals siblings to byte array.
77+
*
78+
* @param marsh Marshaller.
79+
* @throws IgniteCheckedException In case of error.
80+
*/
81+
public void marshalSiblings(Marshaller marsh) throws IgniteCheckedException {
82+
siblingsBytes = U.marshal(marsh, siblings);
83+
}
10784

108-
if (!reader.isLastRead())
109-
return false;
85+
/**
86+
* Unmarshals siblings from byte array.
87+
*
88+
* @param marsh Marshaller.
89+
* @throws IgniteCheckedException In case of error.
90+
*/
91+
public void unmarshalSiblings(Marshaller marsh) throws IgniteCheckedException {
92+
assert marsh != null;
11093

111-
reader.incrementState();
94+
if (siblingsBytes != null) {
95+
siblings = U.unmarshal(marsh, siblingsBytes, null);
11296

97+
siblingsBytes = null;
11398
}
114-
115-
return true;
11699
}
117100

118101
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
7979
import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
8080
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
81+
import org.apache.ignite.internal.codegen.GridJobSiblingsResponseSerializer;
8182
import org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
8283
import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateFilterRequestSerializer;
8384
import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateRequestSerializer;
@@ -313,7 +314,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
313314
factory.register((short)1, GridJobExecuteRequest::new);
314315
factory.register((short)2, GridJobExecuteResponse::new, new GridJobExecuteResponseSerializer());
315316
factory.register((short)3, GridJobSiblingsRequest::new, new GridJobSiblingsRequestSerializer());
316-
factory.register((short)4, GridJobSiblingsResponse::new);
317+
factory.register((short)4, GridJobSiblingsResponse::new, new GridJobSiblingsResponseSerializer());
317318
factory.register((short)5, GridTaskCancelRequest::new, new GridTaskCancelRequestSerializer());
318319
factory.register((short)6, GridTaskSessionRequest::new, new GridTaskSessionRequestSerializer());
319320
factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer());

modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,11 +1412,12 @@ private class JobSiblingsMessageListener implements GridMessageListener {
14121412

14131413
boolean loc = ctx.localNodeId().equals(nodeId);
14141414

1415-
ctx.io().sendToCustomTopic(nodeId, topic,
1416-
new GridJobSiblingsResponse(
1417-
loc ? siblings : null,
1418-
loc ? null : U.marshal(marsh, siblings)),
1419-
SYSTEM_POOL);
1415+
GridJobSiblingsResponse resp = new GridJobSiblingsResponse(siblings);
1416+
1417+
if (!loc)
1418+
resp.marshalSiblings(marsh);
1419+
1420+
ctx.io().sendToCustomTopic(nodeId, topic, resp, SYSTEM_POOL);
14201421
}
14211422
catch (IgniteCheckedException e) {
14221423
U.error(log, "Failed to send job sibling response.", e);

0 commit comments

Comments
 (0)