Skip to content

Commit 347ab41

Browse files
authored
IGNITE-27476 Restore the use of Set instead of Collection in messages (#12618)
1 parent d385d78 commit 347ab41

15 files changed

+44
-47
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -949,7 +949,7 @@ private long allocateForTree() throws IgniteCheckedException {
949949

950950
Set<Integer> missing = new HashSet<>();
951951

952-
for (Integer p : parts.full()) {
952+
for (Integer p : parts.fullSet()) {
953953
GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, topVer);
954954

955955
if (partIter == null) {

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ else if (!oldFut.isDone())
407407
metrics.clearRebalanceCounters();
408408

409409
for (GridDhtPartitionDemandMessage msg : assignments.values()) {
410-
for (Integer partId : msg.partitions().full())
410+
for (Integer partId : msg.partitions().fullSet())
411411
metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));
412412

413413
CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap();
@@ -1063,13 +1063,13 @@ public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
10631063
HashSet<Integer> parts = new HashSet<>(v.partitions().size());
10641064

10651065
parts.addAll(v.partitions().historicalSet());
1066-
parts.addAll(v.partitions().full());
1066+
parts.addAll(v.partitions().fullSet());
10671067

10681068
rebalancingParts.put(k.id(), parts);
10691069

10701070
historical.addAll(v.partitions().historicalSet());
10711071

1072-
Stream.concat(v.partitions().historicalSet().stream(), v.partitions().full().stream())
1072+
Stream.concat(v.partitions().historicalSet().stream(), v.partitions().fullSet().stream())
10731073
.forEach(
10741074
p -> {
10751075
queued.put(p, new LongAdder());
@@ -1182,11 +1182,11 @@ public void requestPartitions() {
11821182
// Make sure partitions scheduled for full rebalancing are cleared first.
11831183
// Clearing attempt is also required for in-memory caches because some partitions can be switched
11841184
// from RENTING to MOVING state in the middle of clearing.
1185-
final int fullSetSize = d.partitions().full().size();
1185+
final int fullSetSize = d.partitions().fullSet().size();
11861186

11871187
AtomicInteger waitCnt = new AtomicInteger(fullSetSize);
11881188

1189-
for (Integer partId : d.partitions().full()) {
1189+
for (Integer partId : d.partitions().fullSet()) {
11901190
GridDhtLocalPartition part = grp.topology().localPartition(partId);
11911191

11921192
// Due to rebalance cancellation it's possible for a group to be already partially rebalanced,
@@ -1214,7 +1214,7 @@ public void requestPartitions() {
12141214
}
12151215

12161216
// The special case for historical only rebalancing.
1217-
if (d.partitions().full().isEmpty() && !d.partitions().historicalSet().isEmpty())
1217+
if (d.partitions().fullSet().isEmpty() && !d.partitions().historicalSet().isEmpty())
12181218
ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> requestPartitions0(node, parts, d));
12191219
}
12201220
}
@@ -1238,7 +1238,7 @@ private void requestPartitions0(
12381238
log.info("Starting rebalance routine [" + grp.cacheOrGroupName() +
12391239
", topVer=" + topVer +
12401240
", supplier=" + supplierNode.id() +
1241-
", fullPartitions=" + S.toStringSortedDistinct(parts.full()) +
1241+
", fullPartitions=" + S.toStringSortedDistinct(parts.fullSet()) +
12421242
", histPartitions=" + S.toStringSortedDistinct(parts.historicalSet()) +
12431243
", rebalanceId=" + rebalanceId + ']');
12441244

@@ -1677,7 +1677,7 @@ public boolean compatibleWith(GridDhtPreloaderAssignments newAssignments) {
16771677
p0.addAll(partitions);
16781678

16791679
for (GridDhtPartitionDemandMessage msg : newAssignments.values()) {
1680-
p1.addAll(msg.partitions().full());
1680+
p1.addAll(msg.partitions().fullSet());
16811681
p1.addAll(msg.partitions().historicalSet());
16821682
}
16831683

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,15 +268,15 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
268268
if (sctx == null) {
269269
if (log.isDebugEnabled())
270270
log.debug("Starting supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) +
271-
", fullPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().full()) +
271+
", fullPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().fullSet()) +
272272
", histPartitions=" + S.toStringSortedDistinct(demandMsg.partitions().historicalSet()) + "]");
273273
}
274274
else
275275
maxBatchesCnt = 1;
276276

277277
if (sctx == null || sctx.iterator == null) {
278278

279-
remainingParts = new HashSet<>(demandMsg.partitions().full());
279+
remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
280280

281281
CachePartitionPartialCountersMap histMap = demandMsg.partitions().historicalMap();
282282

@@ -457,7 +457,7 @@ else if (iter.isPartitionMissing(p)) {
457457

458458
// Mark all remaining partitions as missed to trigger full rebalance.
459459
if (iter == null && F.isEmpty(remainingParts)) {
460-
remainingParts = new HashSet<>(demandMsg.partitions().full());
460+
remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
461461
remainingParts.addAll(demandMsg.partitions().historicalSet());
462462
}
463463

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public void partitionHistorySuppliers(IgniteDhtPartitionHistorySuppliersMap part
371371
/**
372372
*
373373
*/
374-
public Collection<Integer> partsToReload(UUID nodeId, int grpId) {
374+
public Set<Integer> partsToReload(UUID nodeId, int grpId) {
375375
if (partsToReload == null)
376376
return Collections.emptySet();
377377

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
1919

20-
import java.util.Collection;
21-
import java.util.HashSet;
2220
import java.util.Iterator;
2321
import java.util.Set;
2422
import java.util.concurrent.ConcurrentHashMap;
@@ -150,22 +148,22 @@ public void retainMoving(GridDhtPartitionTopology top) {
150148
}
151149
}
152150

153-
Collection<Integer> curFull = cntrMap.full();
151+
Set<Integer> curFullSet = cntrMap.fullSet();
154152
Set<Integer> newFullSet = null;
155153

156-
if (!curFull.isEmpty()) {
154+
if (!curFullSet.isEmpty()) {
157155
int moving = 0;
158156

159157
// Fast-path check.
160-
for (Integer partId : curFull) {
158+
for (Integer partId : curFullSet) {
161159
if (top.localPartition(partId).state() == MOVING)
162160
moving++;
163161
}
164162

165-
if (moving != curFull.size()) {
163+
if (moving != curFullSet.size()) {
166164
newFullSet = U.newHashSet(moving);
167165

168-
for (Integer partId : curFull) {
166+
for (Integer partId : curFullSet) {
169167
if (top.localPartition(partId).state() == MOVING)
170168
newFullSet.add(partId);
171169
}
@@ -177,7 +175,7 @@ public void retainMoving(GridDhtPartitionTopology top) {
177175
newHistMap = curHistMap;
178176

179177
if (newFullSet == null)
180-
newFullSet = new HashSet<>(curFull);
178+
newFullSet = curFullSet;
181179

182180
IgniteDhtDemandedPartitionsMap newMap = new IgniteDhtDemandedPartitionsMap(newHistMap, newFullSet);
183181

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public class IgniteDhtDemandedPartitionsMap implements Serializable, Message {
4545
private CachePartitionPartialCountersMap historical;
4646

4747
/** Set of partitions that require full rebalancing. */
48-
@Order(1)
48+
@Order(value = 1, method = "fullSet")
4949
@GridToStringInclude
50-
private Collection<Integer> full;
50+
private Set<Integer> full;
5151

5252
/**
5353
* @param historical Historical partition set.
@@ -165,15 +165,15 @@ public void historicalMap(CachePartitionPartialCountersMap historical) {
165165
}
166166

167167
/** */
168-
public Collection<Integer> full() {
168+
public Set<Integer> fullSet() {
169169
if (full == null)
170170
return Collections.emptySet();
171171

172-
return Collections.unmodifiableCollection(full);
172+
return Collections.unmodifiableSet(full);
173173
}
174174

175175
/** */
176-
public void full(Collection<Integer> full) {
176+
public void fullSet(Set<Integer> full) {
177177
this.full = full;
178178
}
179179

@@ -195,7 +195,7 @@ public Set<Integer> historicalSet() {
195195

196196
/** */
197197
public Collection<Integer> all() {
198-
return F.concat(false, full(), historicalSet());
198+
return F.concat(false, fullSet(), historicalSet());
199199
}
200200

201201

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818

1919
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
2020

21-
import java.util.Collection;
2221
import java.util.Collections;
2322
import java.util.HashMap;
2423
import java.util.Map;
24+
import java.util.Set;
2525
import java.util.UUID;
2626
import org.apache.ignite.internal.Order;
2727
import org.apache.ignite.internal.util.typedef.F;
@@ -42,9 +42,9 @@ public class IgniteDhtPartitionsToReloadMap implements Message {
4242
/**
4343
* @param nodeId Node ID.
4444
* @param cacheId Cache ID.
45-
* @return Collection of partitions to reload.
45+
* @return Set of partitions to reload.
4646
*/
47-
public synchronized Collection<Integer> get(UUID nodeId, int cacheId) {
47+
public synchronized Set<Integer> get(UUID nodeId, int cacheId) {
4848
if (map == null)
4949
return Collections.emptySet();
5050

@@ -58,7 +58,7 @@ public synchronized Collection<Integer> get(UUID nodeId, int cacheId) {
5858
if (partsToReload == null)
5959
return Collections.emptySet();
6060

61-
return F.emptyIfNull(partsToReload.partitions());
61+
return (Set<Integer>)F.emptyIfNull(partsToReload.partitions());
6262
}
6363

6464
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
1919

20-
import java.util.Collection;
2120
import java.util.HashSet;
21+
import java.util.Set;
2222
import org.apache.ignite.internal.Order;
2323
import org.apache.ignite.plugin.extensions.communication.Message;
2424

@@ -27,21 +27,21 @@ public class PartitionsToReload implements Message {
2727
/** Type code. */
2828
public static final short TYPE_CODE = 511;
2929

30-
/** Collection of partitions to reload. */
30+
/** Set of partitions to reload. */
3131
@Order(value = 0, method = "partitions")
32-
private Collection<Integer> parts;
32+
private Set<Integer> parts;
3333

3434
/**
35-
* @return Collection of partitions to reload.
35+
* @return Set of partitions to reload.
3636
*/
37-
public Collection<Integer> partitions() {
37+
public Set<Integer> partitions() {
3838
return parts;
3939
}
4040

4141
/**
42-
* @param parts Collection of partitions to reload.
42+
* @param parts Set of partitions to reload.
4343
*/
44-
public void partitions(Collection<Integer> parts) {
44+
public void partitions(Set<Integer> parts) {
4545
this.parts = parts;
4646
}
4747

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridD
747747
@Nullable AffinityTopologyVersion exchangeVer,
748748
GridDhtPartitionFullMap partMap,
749749
@Nullable CachePartitionFullCountersMap cntrMap,
750-
Collection<Integer> partsToReload,
750+
Set<Integer> partsToReload,
751751
@Nullable Map<Integer, Long> partSizes,
752752
@Nullable AffinityTopologyVersion msgTopVer,
753753
@Nullable GridDhtPartitionsExchangeFuture exchFut,

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
1919

20-
import java.util.Collection;
2120
import java.util.List;
2221
import java.util.Map;
2322
import java.util.Set;
@@ -298,7 +297,7 @@ public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, G
298297
* means full map received is not related to exchange
299298
* @param partMap Update partition map.
300299
* @param cntrMap Partition update counters.
301-
* @param partsToReload Collection of partitions that need to be reloaded.
300+
* @param partsToReload Set of partitions that need to be reloaded.
302301
* @param partSizes Global partition sizes.
303302
* @param msgTopVer Topology version from incoming message. This value is not null only for case message is not
304303
* related to exchange. Value should be not less than previous 'Topology version from exchange'.
@@ -310,7 +309,7 @@ public boolean update(
310309
@Nullable AffinityTopologyVersion exchangeResVer,
311310
GridDhtPartitionFullMap partMap,
312311
@Nullable CachePartitionFullCountersMap cntrMap,
313-
Collection<Integer> partsToReload,
312+
Set<Integer> partsToReload,
314313
@Nullable Map<Integer, Long> partSizes,
315314
@Nullable AffinityTopologyVersion msgTopVer,
316315
@Nullable GridDhtPartitionsExchangeFuture exchFut,

0 commit comments

Comments
 (0)