Skip to content

Commit b2fdcea

Browse files
authored
IGNITE-26866 Add Message interface to IgniteDhtPartitionsToReloadMap (#12487)
1 parent e27482a commit b2fdcea

File tree

12 files changed

+292
-89
lines changed

12 files changed

+292
-89
lines changed

modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,4 +2146,11 @@ public static int compareArrays(double[] a1, double[] a2) {
21462146
public static <T> Collection<T> emptyIfNull(@Nullable Collection<T> col) {
21472147
return col == null ? Collections.emptySet() : col;
21482148
}
2149+
2150+
/**
2151+
* @param map Map.
2152+
*/
2153+
public static <K, V> Map<K, V> emptyIfNull(@Nullable Map<K, V> map) {
2154+
return map == null ? Collections.emptyMap() : map;
2155+
}
21492156
}

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer;
3636
import org.apache.ignite.internal.codegen.CachePartitionFullCountersMapSerializer;
3737
import org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
38+
import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
3839
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
3940
import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
4041
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
@@ -110,6 +111,7 @@
110111
import org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
111112
import org.apache.ignite.internal.codegen.IgniteDhtPartitionCountersMapSerializer;
112113
import org.apache.ignite.internal.codegen.IgniteDhtPartitionHistorySuppliersMapSerializer;
114+
import org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer;
113115
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
114116
import org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
115117
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -121,6 +123,8 @@
121123
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
122124
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
123125
import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
126+
import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer;
127+
import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
124128
import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
125129
import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
126130
import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer;
@@ -201,6 +205,7 @@
201205
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
202206
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
203207
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
208+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap;
204209
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
205210
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
206211
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -213,7 +218,10 @@
213218
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
214219
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
215220
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
221+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
216222
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
223+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
224+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
217225
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
218226
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
219227
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -466,6 +474,11 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
466474
factory.register(PartitionReservationsMap.TYPE_CODE, PartitionReservationsMap::new, new PartitionReservationsMapSerializer());
467475
factory.register(IgniteDhtPartitionHistorySuppliersMap.TYPE_CODE, IgniteDhtPartitionHistorySuppliersMap::new,
468476
new IgniteDhtPartitionHistorySuppliersMapSerializer());
477+
factory.register(PartitionsToReload.TYPE_CODE, PartitionsToReload::new, new PartitionsToReloadSerializer());
478+
factory.register(CachePartitionsToReloadMap.TYPE_CODE, CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer());
479+
factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, IgniteDhtPartitionsToReloadMap::new,
480+
new IgniteDhtPartitionsToReloadMapSerializer());
481+
factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new, new PartitionSizesMapSerializer());
469482

470483
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
471484
// [120..123] - DR

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
9191
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
9292
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
93+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
9394
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
9495
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
9596
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1440,7 +1441,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
14401441
}
14411442

14421443
if (!partsSizes.isEmpty())
1443-
m.partitionSizes(cctx, partsSizes);
1444+
m.partitionSizes(F.viewReadOnly(partsSizes, PartitionSizesMap::new));
14441445

14451446
return m;
14461447
}
@@ -1771,7 +1772,7 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe
17711772

17721773
boolean updated = false;
17731774

1774-
Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
1775+
Map<Integer, PartitionSizesMap> partsSizes = F.emptyIfNull(msg.partitionSizes());
17751776

17761777
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
17771778
Integer grpId = entry.getKey();
@@ -1781,11 +1782,13 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe
17811782
GridDhtPartitionTopology top = grp == null ? clientTops.get(grpId) : grp.topology();
17821783

17831784
if (top != null) {
1785+
PartitionSizesMap sizesMap = partsSizes.get(grpId);
1786+
17841787
updated |= top.update(null,
17851788
entry.getValue(),
17861789
null,
17871790
msg.partsToReload(cctx.localNodeId(), grpId),
1788-
partsSizes.getOrDefault(grpId, Collections.emptyMap()),
1791+
sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
17891792
msg.topologyVersion(),
17901793
null,
17911794
null);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import org.apache.ignite.internal.Order;
23+
import org.apache.ignite.plugin.extensions.communication.Message;
24+
import org.jetbrains.annotations.Nullable;
25+
26+
/** Partition reload map for cache. */
27+
public class CachePartitionsToReloadMap implements Message {
28+
/** Type code. */
29+
public static final short TYPE_CODE = 512;
30+
31+
/** Partition reload map for cache. */
32+
@Order(value = 0, method = "cachePartitions")
33+
private Map<Integer, PartitionsToReload> map;
34+
35+
/**
36+
* @return Partition reload map for cache.
37+
*/
38+
public Map<Integer, PartitionsToReload> cachePartitions() {
39+
return map;
40+
}
41+
42+
/**
43+
* @param map Partition reload map for cache.
44+
*/
45+
public void cachePartitions(Map<Integer, PartitionsToReload> map) {
46+
this.map = map;
47+
}
48+
49+
/**
50+
* @param cacheId Cache id.
51+
* @return Partitions to reload for this cache.
52+
*/
53+
public @Nullable PartitionsToReload get(int cacheId) {
54+
if (map == null)
55+
return null;
56+
57+
return map.get(cacheId);
58+
}
59+
60+
/**
61+
* @param cacheId Cache id.
62+
* @param parts Partitions to reload.
63+
*/
64+
public void put(int cacheId, PartitionsToReload parts) {
65+
if (map == null)
66+
map = new HashMap<>();
67+
68+
map.put(cacheId, parts);
69+
}
70+
71+
/** {@inheritDoc} */
72+
@Override public short directType() {
73+
return TYPE_CODE;
74+
}
75+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3311,12 +3311,18 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) {
33113311
if (partMap == null)
33123312
continue;
33133313

3314+
Map<Integer, Long> grpPartSizes = singleMsg.partitionSizes(top.groupId());
3315+
33143316
for (Map.Entry<Integer, GridDhtPartitionState> e0 : partMap.entrySet()) {
33153317
int p = e0.getKey();
33163318
GridDhtPartitionState state = e0.getValue();
33173319

3318-
if (state == GridDhtPartitionState.OWNING)
3319-
partSizes.put(p, singleMsg.partitionSizes(top.groupId()).get(p));
3320+
if (state == GridDhtPartitionState.OWNING) {
3321+
Long size = grpPartSizes.get(p);
3322+
3323+
if (size != null)
3324+
partSizes.put(p, size);
3325+
}
33203326
}
33213327
}
33223328

@@ -4654,7 +4660,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
46544660
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
46554661

46564662
try {
4657-
Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
4663+
Map<Integer, PartitionSizesMap> partsSizes = F.emptyIfNull(msg.partitionSizes());
46584664

46594665
doInParallel(
46604666
parallelismLvl,
@@ -4665,11 +4671,13 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
46654671
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
46664672

46674673
if (grp != null) {
4674+
PartitionSizesMap sizesMap = partsSizes.get(grpId);
4675+
46684676
grp.topology().update(resTopVer,
46694677
msg.partitions().get(grpId),
46704678
cntrMap,
46714679
msg.partsToReload(cctx.localNodeId(), grpId),
4672-
partsSizes.getOrDefault(grpId, Collections.emptyMap()),
4680+
sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
46734681
null,
46744682
this,
46754683
msg.lostPartitions(grpId));

0 commit comments

Comments
 (0)