Skip to content

Commit 53c1cbb

Browse files
committed
fix performance degradation for BinaryMetadataUpdatesFlowTest#testConcurrentMetadataUpdates
1 parent 84a4947 commit 53c1cbb

File tree

4 files changed

+41
-46
lines changed

4 files changed

+41
-46
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
5757
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer;
5858
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
59-
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageMarshallableSerializer;
59+
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageSerializer;
6060
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult;
6161
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResultSerializer;
6262
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResult;
@@ -363,8 +363,6 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
363363
factory.register(537, ServiceDeploymentRequest::new,
364364
new ServiceDeploymentRequestMarshallableSerializer(marsh, clsLdr));
365365
factory.register(538, ServiceUndeploymentRequest::new, new ServiceUndeploymentRequestSerializer());
366-
factory.register(534, DynamicCacheChangeBatch::new,
367-
new DynamicCacheChangeBatchMarshallableSerializer(marsh, clsLdr));
368-
factory.register(535, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageMarshallableSerializer(marsh, clsLdr));
366+
factory.register(539, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageSerializer());
369367
}
370368
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.ignite.internal.util.typedef.internal.U;
2727
import org.apache.ignite.lang.IgniteUuid;
2828
import org.apache.ignite.marshaller.Marshaller;
29-
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
29+
import org.apache.ignite.plugin.extensions.communication.Message;
3030
import org.jetbrains.annotations.Nullable;
3131

3232
/**
@@ -70,8 +70,11 @@
7070
* (with <b>pending version</b> strictly greater than <b>accepted version</b>)
7171
* it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with <b>accepted version</b>
7272
* equals to <b>pending version</b> of this metadata to the moment when is was initially read by the thread.
73+
* <p>
74+
* We don't implement MarshallableMessage for this message because it leads to performance degradation when updating BinaryMetadata
75+
* (see test: BinaryMetadataUpdatesFlowTest#testConcurrentMetadataUpdates).
7376
*/
74-
public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, MarshallableMessage {
77+
public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, Message {
7578
/** */
7679
private static final long serialVersionUID = 0L;
7780

@@ -222,26 +225,28 @@ public void metadata(BinaryMetadata metadata) {
222225
this.metadata = metadata;
223226
}
224227

225-
/** {@inheritDoc} */
226-
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
227-
if (metadata != null)
228-
metadataBytes = U.marshal(marsh, metadata);
228+
/**
229+
*
230+
*/
231+
public int typeId() {
232+
return typeId;
229233
}
230234

231-
/** {@inheritDoc} */
232-
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
233-
if (metadataBytes != null) {
234-
metadata = U.unmarshal(marsh, metadataBytes, ldr);
235-
236-
metadataBytes = null;
237-
}
235+
/**
236+
* @param marsh Marshaller.
237+
*/
238+
public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
239+
if (metadata != null)
240+
metadataBytes = U.marshal(marsh, metadata);
238241
}
239242

240243
/**
241-
*
244+
* @param marsh Marshaller.
245+
* @param ldr Class loader.
242246
*/
243-
public int typeId() {
244-
return typeId;
247+
public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
248+
if (metadataBytes != null)
249+
metadata = U.unmarshal(marsh, metadataBytes, ldr);
245250
}
246251

247252
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.ignite.internal.Order;
2424
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2525
import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException;
26+
import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
27+
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
2628
import org.apache.ignite.internal.util.typedef.internal.S;
2729
import org.apache.ignite.internal.util.typedef.internal.U;
2830
import org.apache.ignite.marshaller.Marshaller;
@@ -104,8 +106,15 @@ public DiscoverySpiCustomMessage message() {
104106
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
105107
super.prepareMarshal(marsh);
106108

107-
if (msg instanceof Message)
109+
if (msg instanceof Message) {
110+
if (msg instanceof MetadataUpdateProposedMessage)
111+
((MetadataUpdateProposedMessage)msg).prepareMarshal(marsh);
112+
else if (msg instanceof SecurityAwareCustomMessageWrapper &&
113+
((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage)
114+
((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()).prepareMarshal(marsh);
115+
108116
serMsg = (Message)msg;
117+
}
109118
else {
110119
if (msg != null)
111120
msgBytes = U.marshal(marsh, msg);
@@ -125,8 +134,15 @@ public DiscoverySpiCustomMessage message() {
125134
if (msg != null)
126135
return;
127136

128-
if (serMsg != null)
137+
if (serMsg != null) {
129138
msg = (DiscoverySpiCustomMessage)serMsg;
139+
140+
if (msg instanceof MetadataUpdateProposedMessage)
141+
((MetadataUpdateProposedMessage)msg).finishUnmarshal(marsh, ldr);
142+
else if (msg instanceof SecurityAwareCustomMessageWrapper &&
143+
((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage)
144+
((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()).finishUnmarshal(marsh, ldr);
145+
}
130146
else {
131147
try {
132148
if (msgBytes != null)

modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.ignite.internal.util.typedef.internal.U;
2222
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
2323

24-
import static org.apache.ignite.internal.util.IgniteUtils.toBytes;
2524
import static org.apache.ignite.marshaller.Marshallers.jdk;
2625

2726
/** Serialization test for discovery messages. */
@@ -30,27 +29,4 @@ public class IgniteDiscoveryMessageSerializationTest extends AbstractMessageSeri
3029
@Override protected MessageFactoryProvider messageFactory() {
3130
return new DiscoveryMessageFactory(jdk(), U.gridClassLoader());
3231
}
33-
34-
/** {@inheritDoc} */
35-
@Override protected AbstractTestMessageReader createMessageReader(int capacity) {
36-
return new TestIoMessageReader(capacity);
37-
}
38-
39-
/** */
40-
private static class TestIoMessageReader extends AbstractTestMessageReader {
41-
/** */
42-
private static final byte[] BYTE_ARR = toBytes(null);
43-
44-
/** */
45-
public TestIoMessageReader(int capacity) {
46-
super(capacity);
47-
}
48-
49-
/** {@inheritDoc} */
50-
@Override public byte[] readByteArray() {
51-
super.readByteArray();
52-
53-
return BYTE_ARR;
54-
}
55-
}
5632
}

0 commit comments

Comments
 (0)