Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ private long allocateForTree() throws IgniteCheckedException {

Set<Integer> missing = new HashSet<>();

for (Integer p : parts.full()) {
for (Integer p : parts.fullSet()) {
GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, topVer);

if (partIter == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ else if (!oldFut.isDone())
metrics.clearRebalanceCounters();

for (GridDhtPartitionDemandMessage msg : assignments.values()) {
for (Integer partId : msg.partitions().full())
for (Integer partId : msg.partitions().fullSet())
metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));

CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap();
Expand Down Expand Up @@ -1063,13 +1063,13 @@ public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
HashSet<Integer> parts = new HashSet<>(v.partitions().size());

parts.addAll(v.partitions().historicalSet());
parts.addAll(v.partitions().full());
parts.addAll(v.partitions().fullSet());

rebalancingParts.put(k.id(), parts);

historical.addAll(v.partitions().historicalSet());

Stream.concat(v.partitions().historicalSet().stream(), v.partitions().full().stream())
Stream.concat(v.partitions().historicalSet().stream(), v.partitions().fullSet().stream())
.forEach(
p -> {
queued.put(p, new LongAdder());
Expand Down Expand Up @@ -1182,11 +1182,11 @@ public void requestPartitions() {
// Make sure partitions scheduled for full rebalancing are cleared first.
// Clearing attempt is also required for in-memory caches because some partitions can be switched
// from RENTING to MOVING state in the middle of clearing.
final int fullSetSize = d.partitions().full().size();
final int fullSetSize = d.partitions().fullSet().size();

AtomicInteger waitCnt = new AtomicInteger(fullSetSize);

for (Integer partId : d.partitions().full()) {
for (Integer partId : d.partitions().fullSet()) {
GridDhtLocalPartition part = grp.topology().localPartition(partId);

// Due to rebalance cancellation it's possible for a group to be already partially rebalanced,
Expand Down Expand Up @@ -1214,7 +1214,7 @@ public void requestPartitions() {
}

// The special case for historical only rebalancing.
if (d.partitions().full().isEmpty() && !d.partitions().historicalSet().isEmpty())
if (d.partitions().fullSet().isEmpty() && !d.partitions().historicalSet().isEmpty())
ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> requestPartitions0(node, parts, d));
}
}
Expand All @@ -1238,7 +1238,7 @@ private void requestPartitions0(
log.info("Starting rebalance routine [" + grp.cacheOrGroupName() +
", topVer=" + topVer +
", supplier=" + supplierNode.id() +
", fullPartitions=" + S.toStringSortedDistinct(parts.full()) +
", fullPartitions=" + S.toStringSortedDistinct(parts.fullSet()) +
", histPartitions=" + S.toStringSortedDistinct(parts.historicalSet()) +
", rebalanceId=" + rebalanceId + ']');

Expand Down Expand Up @@ -1677,7 +1677,7 @@ public boolean compatibleWith(GridDhtPreloaderAssignments newAssignments) {
p0.addAll(partitions);

for (GridDhtPartitionDemandMessage msg : newAssignments.values()) {
p1.addAll(msg.partitions().full());
p1.addAll(msg.partitions().fullSet());
p1.addAll(msg.partitions().historicalSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,15 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
if (sctx == null) {
if (log.isDebugEnabled())
log.debug("Starting supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) +
", fullPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().full()) +
", fullPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().fullSet()) +
", histPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().historicalSet()) + "]");
}
else
maxBatchesCnt = 1;

if (sctx == null || sctx.iterator == null) {

remainingParts = new HashSet<>(demandMsg.partitions().full());
remainingParts = new HashSet<>(demandMsg.partitions().fullSet());

CachePartitionPartialCountersMap histMap = demandMsg.partitions().historicalMap();

Expand Down Expand Up @@ -457,7 +457,7 @@ else if (iter.isPartitionMissing(p)) {

// Mark all remaining partitions as missed to trigger full rebalance.
if (iter == null && F.isEmpty(remainingParts)) {
remainingParts = new HashSet<>(demandMsg.partitions().full());
remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
remainingParts.addAll(demandMsg.partitions().historicalSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void partitionHistorySuppliers(IgniteDhtPartitionHistorySuppliersMap part
/**
*
*/
public Collection<Integer> partsToReload(UUID nodeId, int grpId) {
public Set<Integer> partsToReload(UUID nodeId, int grpId) {
if (partsToReload == null)
return Collections.emptySet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -150,22 +148,22 @@ public void retainMoving(GridDhtPartitionTopology top) {
}
}

Collection<Integer> curFull = cntrMap.full();
Set<Integer> curFullSet = cntrMap.fullSet();
Set<Integer> newFullSet = null;

if (!curFull.isEmpty()) {
if (!curFullSet.isEmpty()) {
int moving = 0;

// Fast-path check.
for (Integer partId : curFull) {
for (Integer partId : curFullSet) {
if (top.localPartition(partId).state() == MOVING)
moving++;
}

if (moving != curFull.size()) {
if (moving != curFullSet.size()) {
newFullSet = U.newHashSet(moving);

for (Integer partId : curFull) {
for (Integer partId : curFullSet) {
if (top.localPartition(partId).state() == MOVING)
newFullSet.add(partId);
}
Expand All @@ -177,7 +175,7 @@ public void retainMoving(GridDhtPartitionTopology top) {
newHistMap = curHistMap;

if (newFullSet == null)
newFullSet = new HashSet<>(curFull);
newFullSet = curFullSet;

IgniteDhtDemandedPartitionsMap newMap = new IgniteDhtDemandedPartitionsMap(newHistMap, newFullSet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class IgniteDhtDemandedPartitionsMap implements Serializable, Message {
private CachePartitionPartialCountersMap historical;

/** Set of partitions that require full rebalancing. */
@Order(1)
@Order(value = 1, method = "fullSet")
@GridToStringInclude
private Collection<Integer> full;
private Set<Integer> full;

/**
* @param historical Historical partition set.
Expand Down Expand Up @@ -165,15 +165,15 @@ public void historicalMap(CachePartitionPartialCountersMap historical) {
}

/** */
public Collection<Integer> full() {
public Set<Integer> fullSet() {
if (full == null)
return Collections.emptySet();

return Collections.unmodifiableCollection(full);
return Collections.unmodifiableSet(full);
}

/** */
public void full(Collection<Integer> full) {
public void fullSet(Set<Integer> full) {
this.full = full;
}

Expand All @@ -195,7 +195,7 @@ public Set<Integer> historicalSet() {

/** */
public Collection<Integer> all() {
return F.concat(false, full(), historicalSet());
return F.concat(false, fullSet(), historicalSet());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.F;
Expand All @@ -42,9 +42,9 @@ public class IgniteDhtPartitionsToReloadMap implements Message {
/**
* @param nodeId Node ID.
* @param cacheId Cache ID.
* @return Collection of partitions to reload.
* @return Set of partitions to reload.
*/
public synchronized Collection<Integer> get(UUID nodeId, int cacheId) {
public synchronized Set<Integer> get(UUID nodeId, int cacheId) {
if (map == null)
return Collections.emptySet();

Expand All @@ -58,7 +58,7 @@ public synchronized Collection<Integer> get(UUID nodeId, int cacheId) {
if (partsToReload == null)
return Collections.emptySet();

return F.emptyIfNull(partsToReload.partitions());
return (Set<Integer>)F.emptyIfNull(partsToReload.partitions());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;

Expand All @@ -27,21 +27,21 @@ public class PartitionsToReload implements Message {
/** Type code. */
public static final short TYPE_CODE = 511;

/** Collection of partitions to reload. */
/** Set of partitions to reload. */
@Order(value = 0, method = "partitions")
private Collection<Integer> parts;
private Set<Integer> parts;

/**
* @return Collection of partitions to reload.
* @return Set of partitions to reload.
*/
public Collection<Integer> partitions() {
public Set<Integer> partitions() {
return parts;
}

/**
* @param parts Collection of partitions to reload.
* @param parts Set of partitions to reload.
*/
public void partitions(Collection<Integer> parts) {
public void partitions(Set<Integer> parts) {
this.parts = parts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap cntrMap,
Collection<Integer> partsToReload,
Set<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridDhtPartitionsExchangeFuture exchFut,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.processors.cache.distributed.dht.topology;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -298,7 +297,7 @@ public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, G
* means full map received is not related to exchange
* @param partMap Update partition map.
* @param cntrMap Partition update counters.
* @param partsToReload Collection of partitions that need to be reloaded.
* @param partsToReload Set of partitions that need to be reloaded.
* @param partSizes Global partition sizes.
* @param msgTopVer Topology version from incoming message. This value is not null only for case message is not
* related to exchange. Value should be not less than previous 'Topology version from exchange'.
Expand All @@ -310,7 +309,7 @@ public boolean update(
@Nullable AffinityTopologyVersion exchangeResVer,
GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap cntrMap,
Collection<Integer> partsToReload,
Set<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridDhtPartitionsExchangeFuture exchFut,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap incomeCntrMap,
Collection<Integer> partsToReload,
Set<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridDhtPartitionsExchangeFuture exchFut,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testCircledNodesRestart(int backups, int nodes) throws Exception {
if (msg instanceof GridDhtPartitionDemandMessage) {
GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;

hasFullRebalance.compareAndSet(false, !F.isEmpty(demandMsg.partitions().full()));
hasFullRebalance.compareAndSet(false, !F.isEmpty(demandMsg.partitions().fullSet()));

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public static void cleanup() {
}
}

if (!map.full().isEmpty()) {
if (!map.fullSet().isEmpty()) {
synchronized (mux) {
topVersForFull = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void historicalRebalance(IgniteRunnable someAction) throws Exception {
if (msg instanceof GridDhtPartitionDemandMessage) {
GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;

if (!F.isEmpty(demandMsg.partitions().full()))
if (!F.isEmpty(demandMsg.partitions().fullSet()))
hasFullRebalance.compareAndSet(false, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private void restartRebalance(RebalanceRetrigger retrigger, boolean retriggerAsH
if (rebTopVer == null || rebTopVer.before(demandMsg.topologyVersion()))
rebTopVer = demandMsg.topologyVersion();

if (!F.isEmpty(demandMsg.partitions().full()))
if (!F.isEmpty(demandMsg.partitions().fullSet()))
hasFullRebalance.compareAndSet(false, true);
}

Expand Down
Loading