Skip to content

Commit c40ff36

Browse files
authored
IGNITE-26835 Use MessageSerializer for GridTaskSessionRequest (#12470)
1 parent 562932c commit c40ff36

File tree

4 files changed

+74
-98
lines changed

4 files changed

+74
-98
lines changed

modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionRequest.java

Lines changed: 54 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,31 @@
1717

1818
package org.apache.ignite.internal;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.Map;
21+
import org.apache.ignite.IgniteCheckedException;
2222
import org.apache.ignite.internal.util.typedef.internal.S;
23+
import org.apache.ignite.internal.util.typedef.internal.U;
2324
import org.apache.ignite.lang.IgniteUuid;
25+
import org.apache.ignite.marshaller.Marshaller;
2426
import org.apache.ignite.plugin.extensions.communication.Message;
25-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
26-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
2727

2828
/**
2929
* Task session request.
3030
*/
3131
public class GridTaskSessionRequest implements Message {
3232
/** Task session ID. */
33+
@Order(value = 0, method = "sessionId")
3334
private IgniteUuid sesId;
3435

3536
/** ID of job within a task. */
37+
@Order(1)
3638
private IgniteUuid jobId;
3739

3840
/** Changed attributes bytes. */
41+
@Order(value = 2, method = "attributesBytes")
3942
private byte[] attrsBytes;
4043

4144
/** Changed attributes. */
42-
@GridDirectTransient
4345
private Map<?, ?> attrs;
4446

4547
/**
@@ -52,115 +54,64 @@ public GridTaskSessionRequest() {
5254
/**
5355
* @param sesId Session ID.
5456
* @param jobId Job ID.
55-
* @param attrsBytes Serialized attributes.
5657
* @param attrs Attributes.
5758
*/
58-
public GridTaskSessionRequest(IgniteUuid sesId, IgniteUuid jobId, byte[] attrsBytes, Map<?, ?> attrs) {
59+
public GridTaskSessionRequest(IgniteUuid sesId, IgniteUuid jobId, Map<?, ?> attrs) {
5960
assert sesId != null;
60-
assert attrsBytes != null;
6161
assert attrs != null;
6262

6363
this.sesId = sesId;
6464
this.jobId = jobId;
65-
this.attrsBytes = attrsBytes;
6665
this.attrs = attrs;
6766
}
6867

6968
/**
7069
* @return Changed attributes (serialized).
7170
*/
72-
public byte[] getAttributesBytes() {
71+
public byte[] attributesBytes() {
7372
return attrsBytes;
7473
}
7574

75+
/**
76+
* @param attrsBytes Changed attributes (serialized).
77+
*/
78+
public void attributesBytes(byte[] attrsBytes) {
79+
this.attrsBytes = attrsBytes;
80+
}
81+
7682
/**
7783
* @return Changed attributes.
7884
*/
79-
public Map<?, ?> getAttributes() {
85+
public Map<?, ?> attributes() {
8086
return attrs;
8187
}
8288

8389
/**
8490
* @return Task session ID.
8591
*/
86-
public IgniteUuid getSessionId() {
92+
public IgniteUuid sessionId() {
8793
return sesId;
8894
}
8995

9096
/**
91-
* @return Job ID.
97+
* @param sesId Task session ID.
9298
*/
93-
public IgniteUuid getJobId() {
94-
return jobId;
99+
public void sessionId(IgniteUuid sesId) {
100+
this.sesId = sesId;
95101
}
96102

97-
/** {@inheritDoc} */
98-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
99-
writer.setBuffer(buf);
100-
101-
if (!writer.isHeaderWritten()) {
102-
if (!writer.writeHeader(directType()))
103-
return false;
104-
105-
writer.onHeaderWritten();
106-
}
107-
108-
switch (writer.state()) {
109-
case 0:
110-
if (!writer.writeByteArray(attrsBytes))
111-
return false;
112-
113-
writer.incrementState();
114-
115-
case 1:
116-
if (!writer.writeIgniteUuid(jobId))
117-
return false;
118-
119-
writer.incrementState();
120-
121-
case 2:
122-
if (!writer.writeIgniteUuid(sesId))
123-
return false;
124-
125-
writer.incrementState();
126-
127-
}
128-
129-
return true;
103+
/**
104+
* @return Job ID.
105+
*/
106+
public IgniteUuid jobId() {
107+
return jobId;
130108
}
131109

132-
/** {@inheritDoc} */
133-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
134-
reader.setBuffer(buf);
135-
136-
switch (reader.state()) {
137-
case 0:
138-
attrsBytes = reader.readByteArray();
139-
140-
if (!reader.isLastRead())
141-
return false;
142-
143-
reader.incrementState();
144-
145-
case 1:
146-
jobId = reader.readIgniteUuid();
147-
148-
if (!reader.isLastRead())
149-
return false;
150-
151-
reader.incrementState();
152-
153-
case 2:
154-
sesId = reader.readIgniteUuid();
155-
156-
if (!reader.isLastRead())
157-
return false;
158-
159-
reader.incrementState();
160-
161-
}
162-
163-
return true;
110+
/**
111+
* @param jobId Job ID.
112+
*/
113+
public void jobId(IgniteUuid jobId) {
114+
this.jobId = jobId;
164115
}
165116

166117
/** {@inheritDoc} */
@@ -172,4 +123,28 @@ public IgniteUuid getJobId() {
172123
@Override public String toString() {
173124
return S.toString(GridTaskSessionRequest.class, this);
174125
}
126+
127+
/**
128+
* Marshals changed attributes to byte array.
129+
*
130+
* @param marsh Marshaller.
131+
*/
132+
public void marshalAttributes(Marshaller marsh) throws IgniteCheckedException {
133+
attrsBytes = U.marshal(marsh, attrs);
134+
}
135+
136+
/**
137+
* Unmarshals changed attributes from byte array.
138+
*
139+
* @param marsh Marshaller.
140+
* @param ldr Class loader.
141+
*/
142+
public void unmarshalAttributes(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
143+
if (attrsBytes != null) {
144+
attrs = U.unmarshal(marsh, attrsBytes, ldr);
145+
146+
// It is not required anymore.
147+
attrsBytes = null;
148+
}
149+
}
175150
}

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.ignite.internal.codegen.GridTaskCancelRequestSerializer;
101101
import org.apache.ignite.internal.codegen.GridTaskResultRequestSerializer;
102102
import org.apache.ignite.internal.codegen.GridTaskResultResponseSerializer;
103+
import org.apache.ignite.internal.codegen.GridTaskSessionRequestSerializer;
103104
import org.apache.ignite.internal.codegen.HandshakeMessageSerializer;
104105
import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
105106
import org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
@@ -303,7 +304,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
303304
factory.register((short)3, GridJobSiblingsRequest::new, new GridJobSiblingsRequestSerializer());
304305
factory.register((short)4, GridJobSiblingsResponse::new);
305306
factory.register((short)5, GridTaskCancelRequest::new, new GridTaskCancelRequestSerializer());
306-
factory.register((short)6, GridTaskSessionRequest::new);
307+
factory.register((short)6, GridTaskSessionRequest::new, new GridTaskSessionRequestSerializer());
307308
factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer());
308309
factory.register((short)8, GridIoMessage::new);
309310
factory.register((short)9, GridIoUserMessage::new);

modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -600,8 +600,10 @@ public void setAttributes(GridJobSessionImpl ses, Map<?, ?> attrs) throws Ignite
600600

601601
boolean loc = ctx.localNodeId().equals(taskNode.id()) && !ctx.config().isMarshalLocalJobs();
602602

603-
GridTaskSessionRequest req = new GridTaskSessionRequest(ses.getId(), ses.getJobId(),
604-
loc ? null : U.marshal(marsh, attrs), attrs);
603+
GridTaskSessionRequest req = new GridTaskSessionRequest(ses.getId(), ses.getJobId(), attrs);
604+
605+
if (!loc)
606+
req.marshalAttributes(marsh);
605607

606608
Object topic = TOPIC_TASK.topic(ses.getJobId(), ctx.discovery().localNode().id());
607609

@@ -1756,7 +1758,7 @@ private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req)
17561758
}
17571759

17581760
try {
1759-
GridTaskSessionImpl ses = ctx.session().getSession(req.getSessionId());
1761+
GridTaskSessionImpl ses = ctx.session().getSession(req.sessionId());
17601762

17611763
if (ses == null) {
17621764
if (log.isDebugEnabled())
@@ -1767,9 +1769,10 @@ private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req)
17671769

17681770
boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
17691771

1770-
Map<?, ?> attrs = loc ? req.getAttributes() :
1771-
(Map<?, ?>)U.unmarshal(marsh, req.getAttributesBytes(),
1772-
U.resolveClassLoader(ses.getClassLoader(), ctx.config()));
1772+
if (!loc)
1773+
req.unmarshalAttributes(marsh, U.resolveClassLoader(ses.getClassLoader(), ctx.config()));
1774+
1775+
Map<?, ?> attrs = req.attributes();
17731776

17741777
if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
17751778
Event evt = new TaskEvent(
@@ -1789,7 +1792,7 @@ private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req)
17891792
ses.setInternal(attrs);
17901793
}
17911794

1792-
onChangeTaskAttributes(req.getSessionId(), req.getJobId(), attrs);
1795+
onChangeTaskAttributes(req.sessionId(), req.jobId(), attrs);
17931796
}
17941797
catch (IgniteCheckedException e) {
17951798
U.error(log, "Failed to deserialize session attributes.", e);

modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -994,12 +994,10 @@ else if (rcvrs.remove(nodeId)) {
994994
if (node != null) {
995995
boolean loc = node.id().equals(ctx.localNodeId()) && !ctx.config().isMarshalLocalJobs();
996996

997-
GridTaskSessionRequest req = new GridTaskSessionRequest(
998-
ses.getId(),
999-
s.getJobId(),
1000-
loc ? null : U.marshal(marsh, attrs),
1001-
attrs
1002-
);
997+
GridTaskSessionRequest req = new GridTaskSessionRequest(ses.getId(), s.getJobId(), attrs);
998+
999+
if (!loc)
1000+
req.marshalAttributes(marsh);
10031001

10041002
// Make sure to go through IO manager always, since order
10051003
// should be preserved here.
@@ -1096,7 +1094,7 @@ private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest msg)
10961094
lock.readLock();
10971095

10981096
try {
1099-
GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId());
1097+
GridTaskWorker<?, ?> task = tasks.get(msg.sessionId());
11001098

11011099
if (stopping && !waiting) {
11021100
U.warn(log, "Received task session request while stopping grid (will ignore): " + msg
@@ -1114,13 +1112,12 @@ private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest msg)
11141112

11151113
boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
11161114

1117-
Map<?, ?> attrs = loc ? msg.getAttributes() :
1118-
U.<Map<?, ?>>unmarshal(marsh, msg.getAttributesBytes(),
1119-
U.resolveClassLoader(task.getTask().getClass().getClassLoader(), ctx.config()));
1115+
if (!loc)
1116+
msg.unmarshalAttributes(marsh, U.resolveClassLoader(task.getTask().getClass().getClassLoader(), ctx.config()));
11201117

11211118
GridTaskSessionImpl ses = task.getSession();
11221119

1123-
sendSessionAttributes(attrs, ses);
1120+
sendSessionAttributes(msg.attributes(), ses);
11241121
}
11251122
catch (IgniteCheckedException e) {
11261123
U.error(log, "Failed to deserialize session request: " + msg, e);

0 commit comments

Comments
 (0)