Skip to content

Commit 999dc1f

Browse files
authored
IGNITE-26889 Use MessageSerializer for GridDhtPartitionsFullMessage (#12508)
1 parent 6576891 commit 999dc1f

File tree

6 files changed

+161
-272
lines changed

6 files changed

+161
-272
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer;
6666
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
6767
import org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
68+
import org.apache.ignite.internal.codegen.GridDhtPartitionsFullMessageSerializer;
6869
import org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
6970
import org.apache.ignite.internal.codegen.GridDhtTxFinishRequestSerializer;
7071
import org.apache.ignite.internal.codegen.GridDhtTxFinishResponseSerializer;
@@ -362,7 +363,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
362363
factory.register((short)42, GridDhtForceKeysRequest::new, new GridDhtForceKeysRequestSerializer());
363364
factory.register((short)43, GridDhtForceKeysResponse::new, new GridDhtForceKeysResponseSerializer());
364365
factory.register((short)45, GridDhtPartitionDemandMessage::new, new GridDhtPartitionDemandMessageSerializer());
365-
factory.register((short)46, GridDhtPartitionsFullMessage::new);
366+
factory.register((short)46, GridDhtPartitionsFullMessage::new, new GridDhtPartitionsFullMessageSerializer());
366367
factory.register((short)47, GridDhtPartitionsSingleMessage::new);
367368
factory.register((short)48, GridDhtPartitionsSingleRequest::new, new GridDhtPartitionsSingleRequestSerializer());
368369
factory.register((short)49, GridNearGetRequest::new, new GridNearGetRequestSerializer());

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class ExchangeFailureMessage implements DiscoveryCustomMessage {
5353

5454
/** */
5555
@GridToStringInclude
56-
private final Map<UUID, Exception> exchangeErrors;
56+
private final Map<UUID, Throwable> exchangeErrors;
5757

5858
/** Actions to be done to rollback changes done before the exchange failure. */
5959
private transient ExchangeActions exchangeRollbackActions;
@@ -68,7 +68,7 @@ public class ExchangeFailureMessage implements DiscoveryCustomMessage {
6868
public ExchangeFailureMessage(
6969
ClusterNode locNode,
7070
GridDhtPartitionExchangeId exchId,
71-
Map<UUID, Exception> exchangeErrors,
71+
Map<UUID, Throwable> exchangeErrors,
7272
Collection<String> cacheNames
7373
) {
7474
assert exchId != null;
@@ -94,7 +94,7 @@ public Collection<String> cacheNames() {
9494
}
9595

9696
/** */
97-
public Map<UUID, Exception> exchangeErrors() {
97+
public Map<UUID, Throwable> exchangeErrors() {
9898
return exchangeErrors;
9999
}
100100

@@ -123,7 +123,7 @@ public void exchangeRollbackActions(ExchangeActions exchangeRollbackActions) {
123123
public IgniteCheckedException createFailureCompoundException() {
124124
IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process.");
125125

126-
for (Map.Entry<UUID, Exception> entry : exchangeErrors.entrySet())
126+
for (Map.Entry<UUID, Throwable> entry : exchangeErrors.entrySet())
127127
U.addSuppressed(ex, entry.getValue());
128128

129129
return ex;

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
306306
private volatile Exception exchangeLocE;
307307

308308
/** Exchange exceptions from all participating nodes. */
309-
private final Map<UUID, Exception> exchangeGlobalExceptions = new ConcurrentHashMap<>();
309+
private final Map<UUID, Throwable> exchangeGlobalExceptions = new ConcurrentHashMap<>();
310310

311311
/** Used to track the fact that {@link ExchangeFailureMessage} was sent. */
312312
private volatile boolean isExchangeFailureMsgSent;
@@ -2214,7 +2214,13 @@ private void sendAllPartitions(
22142214
// Prepare full message for newly joined nodes with affinity request.
22152215
final GridDhtPartitionsFullMessage fullMsgWithAff = singleMsgWithAffReq
22162216
.filter(singleMessage -> affinityForJoinedNodes != null)
2217-
.map(singleMessage -> fullMsg.copy().joinedNodeAffinity(affinityForJoinedNodes))
2217+
.map(singleMessage -> {
2218+
GridDhtPartitionsFullMessage copy = fullMsg.copy();
2219+
2220+
copy.joinedNodeAffinity(affinityForJoinedNodes);
2221+
2222+
return copy;
2223+
})
22182224
.orElse(null);
22192225

22202226
// Prepare and send full messages for given nodes.
@@ -4252,7 +4258,7 @@ private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsS
42524258
}
42534259
catch (IllegalStateException e) {
42544260
// Cannot create affinity message.
4255-
Map<UUID, Exception> errs = Collections.singletonMap(
4261+
Map<UUID, Throwable> errs = Collections.singletonMap(
42564262
nodeId,
42574263
node.isClient() ? new IgniteNeedReconnectException(node, e) : new IgniteCheckedException(e));
42584264

@@ -4517,7 +4523,7 @@ private void processFullMessage(boolean checkCrd, ClusterNode node, GridDhtParti
45174523
}
45184524
else {
45194525
if (!F.isEmpty(msg.getErrorsMap())) {
4520-
Exception e = msg.getErrorsMap().get(cctx.localNodeId());
4526+
Throwable e = msg.getErrorsMap().get(cctx.localNodeId());
45214527

45224528
if (e instanceof IgniteNeedReconnectException) {
45234529
onDone(e);
@@ -4654,7 +4660,8 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
46544660

46554661
assert partHistSuppliers.isEmpty();
46564662

4657-
partHistSuppliers.putAll(msg.partitionHistorySuppliers());
4663+
partHistSuppliers.putAll(msg.partitionHistorySuppliers() != null ? msg.partitionHistorySuppliers() :
4664+
IgniteDhtPartitionHistorySuppliersMap.empty());
46584665

46594666
// Reserve at least 2 threads for system operations.
46604667
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);

0 commit comments

Comments
 (0)