Skip to content

Commit d79c2d1

Browse files
committed
IGNITE-27827 Use MessageSerializer for GridDeploymentInfoBean
1 parent 6e65dfd commit d79c2d1

File tree

3 files changed

+165
-134
lines changed

3 files changed

+165
-134
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.ignite.internal.codegen.GridCacheVersionSerializer;
6464
import org.apache.ignite.internal.codegen.GridChangeGlobalStateMessageResponseSerializer;
6565
import org.apache.ignite.internal.codegen.GridCheckpointRequestSerializer;
66+
import org.apache.ignite.internal.codegen.GridDeploymentInfoBeanSerializer;
6667
import org.apache.ignite.internal.codegen.GridDeploymentResponseSerializer;
6768
import org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerializer;
6869
import org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentResponseSerializer;
@@ -369,7 +370,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
369370
factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer());
370371
factory.register((short)8, GridIoMessage::new, new GridIoMessageSerializer());
371372
factory.register((short)9, GridIoUserMessage::new);
372-
factory.register((short)10, GridDeploymentInfoBean::new);
373+
factory.register((short)10, GridDeploymentInfoBean::new, new GridDeploymentInfoBeanSerializer());
373374
factory.register((short)11, GridDeploymentRequest::new);
374375
factory.register((short)12, GridDeploymentResponse::new, new GridDeploymentResponseSerializer());
375376
factory.register((short)13, GridEventStorageMessage::new, new GridEventStorageMessageSerializer());

modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentInfoBean.java

Lines changed: 42 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -18,46 +18,39 @@
1818
package org.apache.ignite.internal.managers.deployment;
1919

2020
import java.io.Externalizable;
21-
import java.io.IOException;
22-
import java.io.ObjectInput;
23-
import java.io.ObjectOutput;
24-
import java.nio.ByteBuffer;
2521
import java.util.Map;
2622
import java.util.UUID;
2723
import org.apache.ignite.configuration.DeploymentMode;
28-
import org.apache.ignite.internal.GridDirectMap;
24+
import org.apache.ignite.internal.Order;
2925
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
3026
import org.apache.ignite.internal.util.typedef.internal.S;
31-
import org.apache.ignite.internal.util.typedef.internal.U;
3227
import org.apache.ignite.lang.IgniteUuid;
3328
import org.apache.ignite.plugin.extensions.communication.Message;
34-
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
35-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
36-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
3729

3830
/**
3931
* Deployment info bean.
4032
*/
41-
public class GridDeploymentInfoBean implements Message, GridDeploymentInfo, Externalizable {
42-
/** */
43-
private static final long serialVersionUID = 0L;
44-
33+
public class GridDeploymentInfoBean implements Message, GridDeploymentInfo {
4534
/** */
35+
@Order(value = 0, method = "classLoaderId")
4636
private IgniteUuid clsLdrId;
4737

4838
/** */
39+
@Order(value = 1, method = "deployMode")
4940
private DeploymentMode depMode;
5041

5142
/** */
43+
@Order(value = 4, method = "userVersion")
5244
private String userVer;
5345

5446
/** */
47+
@Order(value = 2, method = "localDeploymentOwner")
5548
@Deprecated // Left for backward compatibility only.
5649
private boolean locDepOwner;
5750

5851
/** Node class loader participant map. */
5952
@GridToStringInclude
60-
@GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
53+
@Order(value = 3, method = "participants")
6154
private Map<UUID, IgniteUuid> participants;
6255

6356
/**
@@ -82,7 +75,7 @@ public GridDeploymentInfoBean(
8275
this.clsLdrId = clsLdrId;
8376
this.depMode = depMode;
8477
this.userVer = userVer;
85-
this.participants = participants;
78+
this.participants = Map.copyOf(participants);
8679
}
8780

8881
/**
@@ -92,7 +85,7 @@ public GridDeploymentInfoBean(GridDeploymentInfo dep) {
9285
clsLdrId = dep.classLoaderId();
9386
depMode = dep.deployMode();
9487
userVer = dep.userVersion();
95-
participants = dep.participants();
88+
participants = Map.copyOf(dep.participants());
9689
}
9790

9891
/** {@inheritDoc} */
@@ -125,139 +118,55 @@ public GridDeploymentInfoBean(GridDeploymentInfo dep) {
125118
return participants;
126119
}
127120

128-
/** {@inheritDoc} */
129-
@Override public int hashCode() {
130-
return clsLdrId.hashCode();
121+
/**
122+
* @param clsLdrId Class loader ID.
123+
*/
124+
public void classLoaderId(IgniteUuid clsLdrId) {
125+
this.clsLdrId = clsLdrId;
131126
}
132127

133-
/** {@inheritDoc} */
134-
@Override public boolean equals(Object o) {
135-
return o == this || o instanceof GridDeploymentInfoBean &&
136-
clsLdrId.equals(((GridDeploymentInfoBean)o).clsLdrId);
128+
/**
129+
* @param depMode Deployment mode.
130+
*/
131+
public void deployMode(DeploymentMode depMode) {
132+
this.depMode = depMode;
137133
}
138134

139-
/** {@inheritDoc} */
140-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
141-
writer.setBuffer(buf);
142-
143-
if (!writer.isHeaderWritten()) {
144-
if (!writer.writeHeader(directType()))
145-
return false;
146-
147-
writer.onHeaderWritten();
148-
}
149-
150-
switch (writer.state()) {
151-
case 0:
152-
if (!writer.writeIgniteUuid(clsLdrId))
153-
return false;
154-
155-
writer.incrementState();
156-
157-
case 1:
158-
if (!writer.writeByte(depMode != null ? (byte)depMode.ordinal() : -1))
159-
return false;
160-
161-
writer.incrementState();
162-
163-
case 2:
164-
if (!writer.writeBoolean(locDepOwner))
165-
return false;
166-
167-
writer.incrementState();
168-
169-
case 3:
170-
if (!writer.writeMap(participants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
171-
return false;
172-
173-
writer.incrementState();
174-
175-
case 4:
176-
if (!writer.writeString(userVer))
177-
return false;
178-
179-
writer.incrementState();
180-
181-
}
182-
183-
return true;
135+
/**
136+
* @param userVer User version.
137+
*/
138+
public void userVersion(String userVer) {
139+
this.userVer = userVer;
184140
}
185141

186-
/** {@inheritDoc} */
187-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
188-
reader.setBuffer(buf);
189-
190-
switch (reader.state()) {
191-
case 0:
192-
clsLdrId = reader.readIgniteUuid();
193-
194-
if (!reader.isLastRead())
195-
return false;
196-
197-
reader.incrementState();
198-
199-
case 1:
200-
byte depModeOrd;
201-
202-
depModeOrd = reader.readByte();
203-
204-
if (!reader.isLastRead())
205-
return false;
206-
207-
depMode = DeploymentMode.fromOrdinal(depModeOrd);
208-
209-
reader.incrementState();
210-
211-
case 2:
212-
locDepOwner = reader.readBoolean();
213-
214-
if (!reader.isLastRead())
215-
return false;
216-
217-
reader.incrementState();
218-
219-
case 3:
220-
participants = reader.readMap(MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
221-
222-
if (!reader.isLastRead())
223-
return false;
224-
225-
reader.incrementState();
226-
227-
case 4:
228-
userVer = reader.readString();
229-
230-
if (!reader.isLastRead())
231-
return false;
232-
233-
reader.incrementState();
234-
235-
}
142+
/**
143+
* @param locDepOwner Local deployment owner flag.
144+
*/
145+
public void localDeploymentOwner(boolean locDepOwner) {
146+
this.locDepOwner = locDepOwner;
147+
}
236148

237-
return true;
149+
/**
150+
* @param participants Node class loader participant map.
151+
*/
152+
public void participants(Map<UUID, IgniteUuid> participants) {
153+
this.participants = participants;
238154
}
239155

240156
/** {@inheritDoc} */
241-
@Override public short directType() {
242-
return 10;
157+
@Override public int hashCode() {
158+
return clsLdrId.hashCode();
243159
}
244160

245161
/** {@inheritDoc} */
246-
@Override public void writeExternal(ObjectOutput out) throws IOException {
247-
U.writeIgniteUuid(out, clsLdrId);
248-
U.writeEnum(out, depMode);
249-
U.writeString(out, userVer);
250-
out.writeBoolean(locDepOwner);
251-
U.writeMap(out, participants);
162+
@Override public boolean equals(Object o) {
163+
return o == this || o instanceof GridDeploymentInfoBean &&
164+
clsLdrId.equals(((GridDeploymentInfoBean)o).clsLdrId);
252165
}
253166

254167
/** {@inheritDoc} */
255-
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
256-
clsLdrId = U.readIgniteUuid(in);
257-
depMode = DeploymentMode.fromOrdinal(in.readByte());
258-
userVer = U.readString(in);
259-
locDepOwner = in.readBoolean();
260-
participants = U.readMap(in);
168+
@Override public short directType() {
169+
return 10;
261170
}
262171

263172
/** {@inheritDoc} */
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package org.apache.ignite.internal.managers.deployment;
2+
3+
import static java.util.UUID.randomUUID;
4+
import static org.apache.ignite.lang.IgniteUuid.randomUuid;
5+
import static org.junit.Assert.assertEquals;
6+
import static org.junit.Assert.assertNotNull;
7+
import static org.junit.Assert.assertTrue;
8+
9+
import java.nio.ByteBuffer;
10+
import java.util.Map;
11+
import java.util.function.Function;
12+
import org.apache.ignite.configuration.DeploymentMode;
13+
import org.apache.ignite.internal.direct.DirectMessageReader;
14+
import org.apache.ignite.internal.direct.DirectMessageWriter;
15+
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
16+
import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
17+
import org.apache.ignite.plugin.extensions.communication.Message;
18+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
19+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
20+
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
21+
import org.junit.Test;
22+
23+
/**
24+
* Tests for {@link GridDeploymentInfoBean} serialization.
25+
*/
26+
public class GridDeploymentInfoBeanTest {
27+
/** Message factory. */
28+
private final MessageFactory msgFactory =
29+
new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {new GridIoMessageFactory()});
30+
31+
/**
32+
* Tests serialization round trip.
33+
*/
34+
@Test
35+
public void testSerialization() {
36+
var participants = Map.of(
37+
randomUUID(), randomUuid(),
38+
randomUUID(), randomUuid()
39+
);
40+
41+
var srcMsg = new GridDeploymentInfoBean(
42+
randomUuid(),
43+
"userVersion",
44+
DeploymentMode.SHARED,
45+
participants
46+
);
47+
48+
// locDepOwner is deprecated but serialized
49+
srcMsg.localDeploymentOwner(true);
50+
51+
var resMsg = doMarshalUnmarshal(srcMsg);
52+
53+
assertEquals(srcMsg.classLoaderId(), resMsg.classLoaderId());
54+
assertEquals(srcMsg.userVersion(), resMsg.userVersion());
55+
assertEquals(srcMsg.deployMode(), resMsg.deployMode());
56+
assertEquals(srcMsg.participants(), resMsg.participants());
57+
assertEquals(srcMsg.localDeploymentOwner(), resMsg.localDeploymentOwner());
58+
}
59+
60+
/**
61+
* @param srcMsg Message to marshal.
62+
* @param <T> Message type.
63+
* @return Unmarshalled message.
64+
*/
65+
private <T extends Message> T doMarshalUnmarshal(T srcMsg) {
66+
var buf = ByteBuffer.allocate(8 * 1024);
67+
68+
MessageSerializer serializer = msgFactory.serializer(srcMsg.directType());
69+
assertNotNull(serializer);
70+
71+
// Write phase
72+
var fullyWritten = loopBuffer(buf, 0, wBuf -> {
73+
var writer = new DirectMessageWriter(msgFactory);
74+
writer.setBuffer(wBuf);
75+
return serializer.writeTo(srcMsg, writer);
76+
});
77+
assertTrue("The message was not written completely.", fullyWritten);
78+
79+
// Read type code (little-endian) and create message
80+
buf.flip();
81+
82+
byte b0 = buf.get();
83+
byte b1 = buf.get();
84+
85+
var type = (short)((b1 & 0xFF) << 8 | b0 & 0xFF);
86+
assertEquals(srcMsg.directType(), type);
87+
88+
var resMsg = (T)msgFactory.create(type);
89+
90+
// Read phase
91+
var fullyRead = loopBuffer(buf, buf.position(), rBuf -> {
92+
var reader = new DirectMessageReader(msgFactory, null);
93+
reader.setBuffer(rBuf);
94+
return serializer.readFrom(resMsg, reader);
95+
});
96+
assertTrue("The message was not read completely.", fullyRead);
97+
98+
return resMsg;
99+
}
100+
101+
/**
102+
* @param buf Byte buffer.
103+
* @param start Start position.
104+
* @param func Function that is sequentially executed on a different-sized part of the buffer.
105+
* @return {@code True} if the function returns {@code True} at least once, {@code False} otherwise.
106+
*/
107+
private boolean loopBuffer(ByteBuffer buf, int start, Function<ByteBuffer, Boolean> func) {
108+
int pos = start;
109+
110+
do {
111+
buf.position(start);
112+
buf.limit(++pos);
113+
114+
if (func.apply(buf))
115+
return true;
116+
}
117+
while (pos < buf.capacity());
118+
119+
return false;
120+
}
121+
}

0 commit comments

Comments
 (0)