diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index bfcdd05db3f2f..923eb153bef7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -949,7 +949,7 @@ private long allocateForTree() throws IgniteCheckedException { Set missing = new HashSet<>(); - for (Integer p : parts.full()) { + for (Integer p : parts.fullSet()) { GridCloseableIterator partIter = reservedIterator(p, topVer); if (partIter == null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 87dd49a93221e..5ee97669c93c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -407,7 +407,7 @@ else if (!oldFut.isDone()) metrics.clearRebalanceCounters(); for (GridDhtPartitionDemandMessage msg : assignments.values()) { - for (Integer partId : msg.partitions().full()) + for (Integer partId : msg.partitions().fullSet()) metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId)); CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap(); @@ -1063,13 +1063,13 @@ public static class RebalanceFuture extends GridFutureAdapter { HashSet parts = new HashSet<>(v.partitions().size()); parts.addAll(v.partitions().historicalSet()); - parts.addAll(v.partitions().full()); + parts.addAll(v.partitions().fullSet()); rebalancingParts.put(k.id(), parts); historical.addAll(v.partitions().historicalSet()); - Stream.concat(v.partitions().historicalSet().stream(), v.partitions().full().stream()) + Stream.concat(v.partitions().historicalSet().stream(), v.partitions().fullSet().stream()) .forEach( p -> { queued.put(p, new LongAdder()); @@ -1182,11 +1182,11 @@ public void requestPartitions() { // Make sure partitions scheduled for full rebalancing are cleared first. // Clearing attempt is also required for in-memory caches because some partitions can be switched // from RENTING to MOVING state in the middle of clearing. - final int fullSetSize = d.partitions().full().size(); + final int fullSetSize = d.partitions().fullSet().size(); AtomicInteger waitCnt = new AtomicInteger(fullSetSize); - for (Integer partId : d.partitions().full()) { + for (Integer partId : d.partitions().fullSet()) { GridDhtLocalPartition part = grp.topology().localPartition(partId); // Due to rebalance cancellation it's possible for a group to be already partially rebalanced, @@ -1214,7 +1214,7 @@ public void requestPartitions() { } // The special case for historical only rebalancing. - if (d.partitions().full().isEmpty() && !d.partitions().historicalSet().isEmpty()) + if (d.partitions().fullSet().isEmpty() && !d.partitions().historicalSet().isEmpty()) ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> requestPartitions0(node, parts, d)); } } @@ -1238,7 +1238,7 @@ private void requestPartitions0( log.info("Starting rebalance routine [" + grp.cacheOrGroupName() + ", topVer=" + topVer + ", supplier=" + supplierNode.id() + - ", fullPartitions=" + S.toStringSortedDistinct(parts.full()) + + ", fullPartitions=" + S.toStringSortedDistinct(parts.fullSet()) + ", histPartitions=" + S.toStringSortedDistinct(parts.historicalSet()) + ", rebalanceId=" + rebalanceId + ']'); @@ -1677,7 +1677,7 @@ public boolean compatibleWith(GridDhtPreloaderAssignments newAssignments) { p0.addAll(partitions); for (GridDhtPartitionDemandMessage msg : newAssignments.values()) { - p1.addAll(msg.partitions().full()); + p1.addAll(msg.partitions().fullSet()); p1.addAll(msg.partitions().historicalSet()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 85180dff2ef03..3c4a09e671830 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -268,7 +268,7 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand if (sctx == null) { if (log.isDebugEnabled()) log.debug("Starting supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + - ", fullPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().full()) + + ", fullPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().fullSet()) + ", histPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().historicalSet()) + "]"); } else @@ -276,7 +276,7 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand if (sctx == null || sctx.iterator == null) { - remainingParts = new HashSet<>(demandMsg.partitions().full()); + remainingParts = new HashSet<>(demandMsg.partitions().fullSet()); CachePartitionPartialCountersMap histMap = demandMsg.partitions().historicalMap(); @@ -457,7 +457,7 @@ else if (iter.isPartitionMissing(p)) { // Mark all remaining partitions as missed to trigger full rebalance. if (iter == null && F.isEmpty(remainingParts)) { - remainingParts = new HashSet<>(demandMsg.partitions().full()); + remainingParts = new HashSet<>(demandMsg.partitions().fullSet()); remainingParts.addAll(demandMsg.partitions().historicalSet()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 5655ba4a028d6..ead64e48bcff9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -371,7 +371,7 @@ public void partitionHistorySuppliers(IgniteDhtPartitionHistorySuppliersMap part /** * */ - public Collection partsToReload(UUID nodeId, int grpId) { + public Set partsToReload(UUID nodeId, int grpId) { if (partsToReload == null) return Collections.emptySet(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index d21f9ffa1b902..73fbce85deeda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -150,22 +148,22 @@ public void retainMoving(GridDhtPartitionTopology top) { } } - Collection curFull = cntrMap.full(); + Set curFullSet = cntrMap.fullSet(); Set newFullSet = null; - if (!curFull.isEmpty()) { + if (!curFullSet.isEmpty()) { int moving = 0; // Fast-path check. - for (Integer partId : curFull) { + for (Integer partId : curFullSet) { if (top.localPartition(partId).state() == MOVING) moving++; } - if (moving != curFull.size()) { + if (moving != curFullSet.size()) { newFullSet = U.newHashSet(moving); - for (Integer partId : curFull) { + for (Integer partId : curFullSet) { if (top.localPartition(partId).state() == MOVING) newFullSet.add(partId); } @@ -177,7 +175,7 @@ public void retainMoving(GridDhtPartitionTopology top) { newHistMap = curHistMap; if (newFullSet == null) - newFullSet = new HashSet<>(curFull); + newFullSet = curFullSet; IgniteDhtDemandedPartitionsMap newMap = new IgniteDhtDemandedPartitionsMap(newHistMap, newFullSet); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java index 91004bb83be79..fe88fce8389b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java @@ -45,9 +45,9 @@ public class IgniteDhtDemandedPartitionsMap implements Serializable, Message { private CachePartitionPartialCountersMap historical; /** Set of partitions that require full rebalancing. */ - @Order(1) + @Order(value = 1, method = "fullSet") @GridToStringInclude - private Collection full; + private Set full; /** * @param historical Historical partition set. @@ -165,15 +165,15 @@ public void historicalMap(CachePartitionPartialCountersMap historical) { } /** */ - public Collection full() { + public Set fullSet() { if (full == null) return Collections.emptySet(); - return Collections.unmodifiableCollection(full); + return Collections.unmodifiableSet(full); } /** */ - public void full(Collection full) { + public void fullSet(Set full) { this.full = full; } @@ -195,7 +195,7 @@ public Set historicalSet() { /** */ public Collection all() { - return F.concat(false, full(), historicalSet()); + return F.concat(false, fullSet(), historicalSet()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java index bc965a49926d3..7ebc88c6b4786 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java @@ -18,10 +18,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.F; @@ -42,9 +42,9 @@ public class IgniteDhtPartitionsToReloadMap implements Message { /** * @param nodeId Node ID. * @param cacheId Cache ID. - * @return Collection of partitions to reload. + * @return Set of partitions to reload. */ - public synchronized Collection get(UUID nodeId, int cacheId) { + public synchronized Set get(UUID nodeId, int cacheId) { if (map == null) return Collections.emptySet(); @@ -58,7 +58,7 @@ public synchronized Collection get(UUID nodeId, int cacheId) { if (partsToReload == null) return Collections.emptySet(); - return F.emptyIfNull(partsToReload.partitions()); + return (Set)F.emptyIfNull(partsToReload.partitions()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java index 023b28ad03f1a..c5a61f730b365 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.Collection; import java.util.HashSet; +import java.util.Set; import org.apache.ignite.internal.Order; import org.apache.ignite.plugin.extensions.communication.Message; @@ -27,21 +27,21 @@ public class PartitionsToReload implements Message { /** Type code. */ public static final short TYPE_CODE = 511; - /** Collection of partitions to reload. */ + /** Set of partitions to reload. */ @Order(value = 0, method = "partitions") - private Collection parts; + private Set parts; /** - * @return Collection of partitions to reload. + * @return Set of partitions to reload. */ - public Collection partitions() { + public Set partitions() { return parts; } /** - * @param parts Collection of partitions to reload. + * @param parts Set of partitions to reload. */ - public void partitions(Collection parts) { + public void partitions(Set parts) { this.parts = parts; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java index 675f891be690c..9ca26f636ae3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java @@ -747,7 +747,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, - Collection partsToReload, + Set partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java index 7d6a079f0449b..0c2f415a7a1c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.topology; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -298,7 +297,7 @@ public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, G * means full map received is not related to exchange * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @param partsToReload Collection of partitions that need to be reloaded. + * @param partsToReload Set of partitions that need to be reloaded. * @param partSizes Global partition sizes. * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not * related to exchange. Value should be not less than previous 'Topology version from exchange'. @@ -310,7 +309,7 @@ public boolean update( @Nullable AffinityTopologyVersion exchangeResVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, - Collection partsToReload, + Set partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index 8227e16172814..b004053ca6f35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -1440,7 +1440,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap incomeCntrMap, - Collection partsToReload, + Set partsToReload, @Nullable Map partSizes, @Nullable AffinityTopologyVersion msgTopVer, @Nullable GridDhtPartitionsExchangeFuture exchFut, diff --git a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java index f8fc31de61acb..587ac90acdc50 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java @@ -169,7 +169,7 @@ public void testCircledNodesRestart(int backups, int nodes) throws Exception { if (msg instanceof GridDhtPartitionDemandMessage) { GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg; - hasFullRebalance.compareAndSet(false, !F.isEmpty(demandMsg.partitions().full())); + hasFullRebalance.compareAndSet(false, !F.isEmpty(demandMsg.partitions().fullSet())); return true; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java index f95b8bb025b0f..7c84d32cacf2f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java @@ -246,7 +246,7 @@ public static void cleanup() { } } - if (!map.full().isEmpty()) { + if (!map.fullSet().isEmpty()) { synchronized (mux) { topVersForFull = true; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java index 0c4c80211a36a..5478ec6e085dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java @@ -194,7 +194,7 @@ public void historicalRebalance(IgniteRunnable someAction) throws Exception { if (msg instanceof GridDhtPartitionDemandMessage) { GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg; - if (!F.isEmpty(demandMsg.partitions().full())) + if (!F.isEmpty(demandMsg.partitions().fullSet())) hasFullRebalance.compareAndSet(false, true); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java index 6b4d5e2552b26..19f6e9f67bd97 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java @@ -268,7 +268,7 @@ private void restartRebalance(RebalanceRetrigger retrigger, boolean retriggerAsH if (rebTopVer == null || rebTopVer.before(demandMsg.topologyVersion())) rebTopVer = demandMsg.topologyVersion(); - if (!F.isEmpty(demandMsg.partitions().full())) + if (!F.isEmpty(demandMsg.partitions().fullSet())) hasFullRebalance.compareAndSet(false, true); }