Skip to content

Commit 4a1c139

Browse files
committed
IGNITE-28046 Use MessageSerializer for MetadataUpdateProposedMessage
1 parent 283b6ed commit 4a1c139

File tree

4 files changed

+76
-28
lines changed

4 files changed

+76
-28
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
4242
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
4343
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer;
44+
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
45+
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageSerializer;
4446
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
4547
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessageSerializer;
4648
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
@@ -168,5 +170,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
168170
new TxTimeoutOnPartitionMapExchangeChangeMessageSerializer());
169171
factory.register((short)510, UserAcceptedMessage::new, new UserAcceptedMessageSerializer());
170172
factory.register((short)511, UserProposedMessage::new, new UserProposedMessageSerializer());
173+
factory.register((short)512, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageSerializer());
171174
}
172175
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
550550
//coordinator receives update request
551551
if (metaVerInfo != null) {
552552
if (metaVerInfo.removing()) {
553-
msg.markRejected(new BinaryObjectException("The type is removing now [typeId=" + typeId + ']'));
553+
msg.markRejected("The type is removing now [typeId=" + typeId + ']');
554554

555555
pendingVer = REMOVED_VERSION;
556556
acceptedVer = REMOVED_VERSION;
@@ -589,7 +589,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
589589
catch (BinaryObjectException err) {
590590
log.warning("Exception with merging metadata for typeId: " + typeId, err);
591591

592-
msg.markRejected(err);
592+
msg.markRejected(err.getMessage());
593593
}
594594
}
595595
}
@@ -602,7 +602,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
602602
MetadataUpdateResultFuture fut = unlabeledFutures.poll();
603603

604604
if (msg.rejected())
605-
fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
605+
fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionErrorMessage()));
606606
else {
607607
if (clientNode) {
608608
boolean success = casBinaryMetadata(typeId, new BinaryMetadataVersionInfo(msg.metadata(), pendingVer, acceptedVer));

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,23 @@
1717
package org.apache.ignite.internal.processors.cache.binary;
1818

1919
import java.util.UUID;
20-
import org.apache.ignite.binary.BinaryObjectException;
20+
import org.apache.ignite.IgniteCheckedException;
21+
import org.apache.ignite.IgniteException;
22+
import org.apache.ignite.internal.Order;
2123
import org.apache.ignite.internal.binary.BinaryMetadata;
2224
import org.apache.ignite.internal.binary.BinaryMetadataHandler;
2325
import org.apache.ignite.internal.managers.discovery.DiscoCache;
2426
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2527
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
2628
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
2729
import org.apache.ignite.internal.util.typedef.internal.S;
30+
import org.apache.ignite.internal.util.typedef.internal.U;
2831
import org.apache.ignite.lang.IgniteUuid;
32+
import org.apache.ignite.plugin.extensions.communication.Message;
2933
import org.jetbrains.annotations.Nullable;
3034

35+
import static org.apache.ignite.marshaller.Marshallers.jdk;
36+
3137
/**
3238
* <b>MetadataUpdateProposedMessage</b> and {@link MetadataUpdateAcceptedMessage} messages make a basis for
3339
* discovery-based protocol for exchanging {@link BinaryMetadata metadata} describing objects in binary format stored in Ignite caches.
@@ -70,33 +76,50 @@
7076
* it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with <b>accepted version</b>
7177
* equals to <b>pending version</b> of this metadata to the moment when is was initially read by the thread.
7278
*/
73-
public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage {
79+
public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, Message {
7480
/** */
7581
private static final long serialVersionUID = 0L;
7682

7783
/** */
78-
private final IgniteUuid id = IgniteUuid.randomUuid();
84+
@Order(0)
85+
IgniteUuid id;
7986

8087
/** Node UUID which initiated metadata update. */
81-
private final UUID origNodeId;
88+
@Order(1)
89+
UUID origNodeId;
8290

8391
/** */
8492
private BinaryMetadata metadata;
8593

94+
/** Serialized {@link #metadata}. */
95+
@Order(value = 2, method = "metadataBytes")
96+
@SuppressWarnings("unused")
97+
byte[] metadataBytesHolder;
98+
8699
/** Metadata type id. */
87-
private final int typeId;
100+
@Order(3)
101+
int typeId;
88102

89103
/** Metadata version which is pending for update. */
90-
private int pendingVer;
104+
@Order(4)
105+
int pendingVer;
91106

92107
/** Metadata version which is already accepted by entire cluster. */
93-
private int acceptedVer;
108+
@Order(5)
109+
int acceptedVer;
94110

95111
/** Message acceptance status. */
96-
private ProposalStatus status = ProposalStatus.SUCCESSFUL;
112+
@Order(6)
113+
boolean rejected;
97114

98115
/** */
99-
private BinaryObjectException err;
116+
@Order(7)
117+
String errMsg;
118+
119+
/** Constructor. */
120+
public MetadataUpdateProposedMessage() {
121+
// No-op.
122+
}
100123

101124
/**
102125
* @param metadata {@link BinaryMetadata} requested to be updated.
@@ -106,6 +129,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
106129
assert origNodeId != null;
107130
assert metadata != null;
108131

132+
id = IgniteUuid.randomUuid();
109133
this.origNodeId = origNodeId;
110134

111135
this.metadata = metadata;
@@ -123,7 +147,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
123147
* {@inheritDoc}
124148
*/
125149
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
126-
return (status == ProposalStatus.SUCCESSFUL) ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
150+
return !rejected ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
127151
}
128152

129153
/**
@@ -140,25 +164,25 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
140164
}
141165

142166
/**
143-
* @param err Error caused this update to be rejected.
167+
* @param errMsg Error message caused this update to be rejected.
144168
*/
145-
void markRejected(BinaryObjectException err) {
146-
status = ProposalStatus.REJECTED;
147-
this.err = err;
169+
void markRejected(String errMsg) {
170+
rejected = true;
171+
this.errMsg = errMsg;
148172
}
149173

150174
/**
151175
*
152176
*/
153177
boolean rejected() {
154-
return status == ProposalStatus.REJECTED;
178+
return rejected;
155179
}
156180

157181
/**
158182
*
159183
*/
160-
BinaryObjectException rejectionError() {
161-
return err;
184+
String rejectionErrorMessage() {
185+
return errMsg;
162186
}
163187

164188
/**
@@ -210,20 +234,42 @@ public void metadata(BinaryMetadata metadata) {
210234
this.metadata = metadata;
211235
}
212236

237+
/**
238+
* @return Serialized binary metadata.
239+
*/
240+
public byte[] metadataBytes() {
241+
try {
242+
return U.marshal(jdk(), metadata);
243+
}
244+
catch (IgniteCheckedException e) {
245+
throw new IgniteException("Failed to marshal binary metadata", e);
246+
}
247+
}
248+
249+
/**
250+
* @param metadataBytes Serialized binary metadata.
251+
*/
252+
public void metadataBytes(byte[] metadataBytes) {
253+
if (metadataBytes != null && metadata == null) {
254+
try {
255+
metadata = U.unmarshal(jdk(), metadataBytes, U.gridClassLoader());
256+
}
257+
catch (IgniteCheckedException e) {
258+
throw new IgniteException("Failed to unmarshal binary metadata", e);
259+
}
260+
}
261+
}
262+
213263
/**
214264
*
215265
*/
216266
public int typeId() {
217267
return typeId;
218268
}
219269

220-
/** Message acceptance status. */
221-
private enum ProposalStatus {
222-
/** */
223-
SUCCESSFUL,
224-
225-
/** */
226-
REJECTED
270+
/** {@inheritDoc} */
271+
@Override public short directType() {
272+
return 512;
227273
}
228274

229275
/** {@inheritDoc} */

modules/core/src/main/resources/META-INF/classnames.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,6 @@ org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage
10321032
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage
10331033
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage
10341034
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage
1035-
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage$ProposalStatus
10361035
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateResult$ResultType
10371036
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$BlockSetCallable
10381037
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate

0 commit comments

Comments
 (0)