Skip to content

Commit 09b96a4

Browse files
committed
IGNITE-27968 Use message serializer for ExchangeFailureMessage
1 parent 9eb8603 commit 09b96a4

File tree

2 files changed

+27
-8
lines changed

2 files changed

+27
-8
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer;
4444
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
4545
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatchMarshallableSerializer;
46+
import org.apache.ignite.internal.processors.cache.ExchangeFailureMessage;
47+
import org.apache.ignite.internal.processors.cache.ExchangeFailureMessageSerializer;
4648
import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage;
4749
import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessageSerializer;
4850
import org.apache.ignite.internal.processors.cache.WalStateFinishMessage;
@@ -55,6 +57,8 @@
5557
import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
5658
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
5759
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer;
60+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
61+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeIdSerializer;
5862
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult;
5963
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResultSerializer;
6064
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResult;
@@ -306,6 +310,7 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
306310
factory.register(38, ChangeCacheEncryptionRequest::new, new ChangeCacheEncryptionRequestSerializer());
307311

308312
factory.register(86, GridCacheVersion::new, new GridCacheVersionSerializer());
313+
factory.register(87, GridDhtPartitionExchangeId::new, new GridDhtPartitionExchangeIdSerializer());
309314

310315
factory.register(167, ServiceDeploymentProcessId::new, new ServiceDeploymentProcessIdSerializer());
311316
factory.register(169, ServiceSingleNodeDeploymentResult::new, new ServiceSingleNodeDeploymentResultSerializer());
@@ -361,5 +366,6 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
361366
factory.register(537, ServiceDeploymentRequest::new,
362367
new ServiceDeploymentRequestMarshallableSerializer(marsh, clsLdr));
363368
factory.register(538, ServiceUndeploymentRequest::new, new ServiceUndeploymentRequestSerializer());
369+
factory.register(539, ExchangeFailureMessage::new, new ExchangeFailureMessageSerializer());
364370
}
365371
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.UUID;
2323
import org.apache.ignite.IgniteCheckedException;
2424
import org.apache.ignite.cluster.ClusterNode;
25+
import org.apache.ignite.internal.Order;
26+
import org.apache.ignite.internal.managers.communication.ErrorMessage;
2527
import org.apache.ignite.internal.managers.discovery.DiscoCache;
2628
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2729
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -32,32 +34,43 @@
3234
import org.apache.ignite.internal.util.typedef.internal.S;
3335
import org.apache.ignite.internal.util.typedef.internal.U;
3436
import org.apache.ignite.lang.IgniteUuid;
37+
import org.apache.ignite.plugin.extensions.communication.Message;
38+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3539
import org.jetbrains.annotations.Nullable;
3640

3741
/**
3842
* This class represents discovery message that is used to provide information about dynamic cache start failure.
3943
*/
40-
public class ExchangeFailureMessage implements DiscoveryCustomMessage {
44+
public class ExchangeFailureMessage implements DiscoveryCustomMessage, Message {
4145
/** */
4246
private static final long serialVersionUID = 0L;
4347

4448
/** Cache names. */
4549
@GridToStringInclude
46-
private final Collection<String> cacheNames;
50+
@Order(0)
51+
Collection<String> cacheNames;
4752

4853
/** Custom message ID. */
49-
private final IgniteUuid id;
54+
@Order(1)
55+
IgniteUuid id;
5056

5157
/** */
52-
private final GridDhtPartitionExchangeId exchId;
58+
@Order(2)
59+
GridDhtPartitionExchangeId exchId;
5360

5461
/** */
5562
@GridToStringInclude
56-
private final Map<UUID, Throwable> exchangeErrors;
63+
@Order(3)
64+
Map<UUID, ErrorMessage> exchangeErrors;
5765

5866
/** Actions to be done to rollback changes done before the exchange failure. */
5967
private transient ExchangeActions exchangeRollbackActions;
6068

69+
/** Default constructor for {@link MessageFactory}. */
70+
public ExchangeFailureMessage() {
71+
// No-op.
72+
}
73+
6174
/**
6275
* Creates new DynamicCacheChangeFailureMessage instance.
6376
*
@@ -78,7 +91,7 @@ public ExchangeFailureMessage(
7891
this.id = IgniteUuid.fromUuid(locNode.id());
7992
this.exchId = exchId;
8093
this.cacheNames = cacheNames;
81-
this.exchangeErrors = exchangeErrors;
94+
this.exchangeErrors = F.viewReadOnly(exchangeErrors, ErrorMessage::new);
8295
}
8396

8497
/** {@inheritDoc} */
@@ -95,7 +108,7 @@ public Collection<String> cacheNames() {
95108

96109
/** */
97110
public Map<UUID, Throwable> exchangeErrors() {
98-
return exchangeErrors;
111+
return F.viewReadOnly(exchangeErrors, e -> ErrorMessage.error(e));
99112
}
100113

101114
/**
@@ -123,7 +136,7 @@ public void exchangeRollbackActions(ExchangeActions exchangeRollbackActions) {
123136
public IgniteCheckedException createFailureCompoundException() {
124137
IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process.");
125138

126-
for (Map.Entry<UUID, Throwable> entry : exchangeErrors.entrySet())
139+
for (Map.Entry<UUID, Throwable> entry : exchangeErrors().entrySet())
127140
U.addSuppressed(ex, entry.getValue());
128141

129142
return ex;

0 commit comments

Comments
 (0)