Skip to content

Commit 92074fd

Browse files
authored
IGNITE-26517 Use MessageSerializer for GridDhtPartitionsSingleMessage (#12523)
1 parent 08e5ab4 commit 92074fd

File tree

8 files changed

+175
-420
lines changed

8 files changed

+175
-420
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
6767
import org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
6868
import org.apache.ignite.internal.codegen.GridDhtPartitionsFullMessageSerializer;
69+
import org.apache.ignite.internal.codegen.GridDhtPartitionsSingleMessageSerializer;
6970
import org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
7071
import org.apache.ignite.internal.codegen.GridDhtTxFinishRequestSerializer;
7172
import org.apache.ignite.internal.codegen.GridDhtTxFinishResponseSerializer;
@@ -118,6 +119,7 @@
118119
import org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer;
119120
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
120121
import org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
122+
import org.apache.ignite.internal.codegen.IntLongMapSerializer;
121123
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
122124
import org.apache.ignite.internal.codegen.LatchAckMessageSerializer;
123125
import org.apache.ignite.internal.codegen.MetadataRequestMessageSerializer;
@@ -127,7 +129,6 @@
127129
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
128130
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
129131
import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
130-
import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer;
131132
import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
132133
import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
133134
import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
@@ -223,8 +224,8 @@
223224
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
224225
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
225226
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
227+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IntLongMap;
226228
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
227-
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
228229
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
229230
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
230231
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
@@ -364,7 +365,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
364365
factory.register((short)43, GridDhtForceKeysResponse::new, new GridDhtForceKeysResponseSerializer());
365366
factory.register((short)45, GridDhtPartitionDemandMessage::new, new GridDhtPartitionDemandMessageSerializer());
366367
factory.register((short)46, GridDhtPartitionsFullMessage::new, new GridDhtPartitionsFullMessageSerializer());
367-
factory.register((short)47, GridDhtPartitionsSingleMessage::new);
368+
factory.register((short)47, GridDhtPartitionsSingleMessage::new, new GridDhtPartitionsSingleMessageSerializer());
368369
factory.register((short)48, GridDhtPartitionsSingleRequest::new, new GridDhtPartitionsSingleRequestSerializer());
369370
factory.register((short)49, GridNearGetRequest::new, new GridNearGetRequestSerializer());
370371
factory.register((short)50, GridNearGetResponse::new, new GridNearGetResponseSerializer());
@@ -482,7 +483,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
482483
factory.register(CachePartitionsToReloadMap.TYPE_CODE, CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer());
483484
factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, IgniteDhtPartitionsToReloadMap::new,
484485
new IgniteDhtPartitionsToReloadMapSerializer());
485-
factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new, new PartitionSizesMapSerializer());
486+
factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new IntLongMapSerializer());
486487
factory.register(DeploymentModeMessage.TYPE_CODE, DeploymentModeMessage::new, new DeploymentModeMessageSerializer());
487488

488489
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
9191
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
9292
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
93-
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
93+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IntLongMap;
9494
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
9595
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
9696
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1441,7 +1441,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
14411441
}
14421442

14431443
if (!partsSizes.isEmpty())
1444-
m.partitionSizes(F.viewReadOnly(partsSizes, PartitionSizesMap::new));
1444+
m.partitionSizes(F.viewReadOnly(partsSizes, IntLongMap::new));
14451445

14461446
return m;
14471447
}
@@ -1772,7 +1772,7 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe
17721772

17731773
boolean updated = false;
17741774

1775-
Map<Integer, PartitionSizesMap> partsSizes = F.emptyIfNull(msg.partitionSizes());
1775+
Map<Integer, IntLongMap> partsSizes = F.emptyIfNull(msg.partitionSizes());
17761776

17771777
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
17781778
Integer grpId = entry.getKey();
@@ -1782,13 +1782,13 @@ public void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMe
17821782
GridDhtPartitionTopology top = grp == null ? clientTops.get(grpId) : grp.topology();
17831783

17841784
if (top != null) {
1785-
PartitionSizesMap sizesMap = partsSizes.get(grpId);
1785+
IntLongMap sizesMap = partsSizes.get(grpId);
17861786

17871787
updated |= top.update(null,
17881788
entry.getValue(),
17891789
null,
17901790
msg.partsToReload(cctx.localNodeId(), grpId),
1791-
sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
1791+
sizesMap != null ? F.emptyIfNull(sizesMap.map()) : Collections.emptyMap(),
17921792
msg.topologyVersion(),
17931793
null,
17941794
null);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
1919

20-
import java.nio.ByteBuffer;
2120
import org.apache.ignite.internal.Order;
2221
import org.apache.ignite.internal.managers.communication.GridIoMessage;
2322
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
2423
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
2524
import org.apache.ignite.internal.util.typedef.internal.S;
26-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
27-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
2825
import org.jetbrains.annotations.Nullable;
2926

3027
/**
@@ -159,83 +156,6 @@ public boolean restoreState() {
159156
return (flags & RESTORE_STATE_FLAG_MASK) != 0;
160157
}
161158

162-
/** {@inheritDoc} */
163-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
164-
// TODO: Remove #writeTo() after all inheritors have migrated to the new ser/der scheme (IGNITE-25490).
165-
writer.setBuffer(buf);
166-
167-
if (!super.writeTo(buf, writer))
168-
return false;
169-
170-
if (!writer.isHeaderWritten()) {
171-
if (!writer.writeHeader(directType()))
172-
return false;
173-
174-
writer.onHeaderWritten();
175-
}
176-
177-
switch (writer.state()) {
178-
case 3:
179-
if (!writer.writeMessage(exchId))
180-
return false;
181-
182-
writer.incrementState();
183-
184-
case 4:
185-
if (!writer.writeByte(flags))
186-
return false;
187-
188-
writer.incrementState();
189-
190-
case 5:
191-
if (!writer.writeMessage(lastVer))
192-
return false;
193-
194-
writer.incrementState();
195-
196-
}
197-
198-
return true;
199-
}
200-
201-
/** {@inheritDoc} */
202-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
203-
// TODO: Remove #readFrom() after all inheritors have migrated to the new ser/der scheme (IGNITE-25490).
204-
reader.setBuffer(buf);
205-
206-
if (!super.readFrom(buf, reader))
207-
return false;
208-
209-
switch (reader.state()) {
210-
case 3:
211-
exchId = reader.readMessage();
212-
213-
if (!reader.isLastRead())
214-
return false;
215-
216-
reader.incrementState();
217-
218-
case 4:
219-
flags = reader.readByte();
220-
221-
if (!reader.isLastRead())
222-
return false;
223-
224-
reader.incrementState();
225-
226-
case 5:
227-
lastVer = reader.readMessage();
228-
229-
if (!reader.isLastRead())
230-
return false;
231-
232-
reader.incrementState();
233-
234-
}
235-
236-
return true;
237-
}
238-
239159
/** {@inheritDoc} */
240160
@Override public String toString() {
241161
return S.toString(GridDhtPartitionsAbstractMessage.class, this, super.toString());

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4004,7 +4004,8 @@ private void processSingleMessageOnCrdFinish(
40044004
GridDhtPartitionsSingleMessage msg,
40054005
Map<Integer, CacheGroupAffinityMessage> messageAccumulator
40064006
) {
4007-
msg.partitionUpdateCounters().forEach((grpId, updCntrs) -> partitionTopology(grpId).collectUpdateCounters(updCntrs));
4007+
F.emptyIfNull(msg.partitionUpdateCounters()).forEach((grpId, updCntrs) ->
4008+
partitionTopology(grpId).collectUpdateCounters(updCntrs));
40084009

40094010
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
40104011

@@ -4667,7 +4668,7 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
46674668
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
46684669

46694670
try {
4670-
Map<Integer, PartitionSizesMap> partsSizes = F.emptyIfNull(msg.partitionSizes());
4671+
Map<Integer, IntLongMap> partsSizes = F.emptyIfNull(msg.partitionSizes());
46714672

46724673
doInParallel(
46734674
parallelismLvl,
@@ -4678,13 +4679,13 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa
46784679
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
46794680

46804681
if (grp != null) {
4681-
PartitionSizesMap sizesMap = partsSizes.get(grpId);
4682+
IntLongMap sizesMap = partsSizes.get(grpId);
46824683

46834684
grp.topology().update(resTopVer,
46844685
msg.partitions().get(grpId),
46854686
cntrMap,
46864687
msg.partsToReload(cctx.localNodeId(), grpId),
4687-
sizesMap != null ? F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
4688+
sizesMap != null ? F.emptyIfNull(sizesMap.map()) : Collections.emptyMap(),
46884689
null,
46894690
this,
46904691
msg.lostPartitions(grpId));

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
8989

9090
/** Partition sizes. */
9191
@Order(value = 11, method = "partitionSizes")
92-
private Map<Integer, PartitionSizesMap> partsSizes;
92+
private Map<Integer, IntLongMap> partsSizes;
9393

9494
/** Topology version. */
9595
@Order(value = 12, method = "topologyVersion")
@@ -383,14 +383,14 @@ public Collection<Integer> partsToReload(UUID nodeId, int grpId) {
383383
*
384384
* @param partsSizes Partitions sizes map.
385385
*/
386-
public void partitionSizes(Map<Integer, PartitionSizesMap> partsSizes) {
386+
public void partitionSizes(Map<Integer, IntLongMap> partsSizes) {
387387
this.partsSizes = partsSizes;
388388
}
389389

390390
/**
391391
* @return Partition sizes map (grpId, (partId, partSize)).
392392
*/
393-
public Map<Integer, PartitionSizesMap> partitionSizes() {
393+
public Map<Integer, IntLongMap> partitionSizes() {
394394
return partsSizes;
395395
}
396396

0 commit comments

Comments
 (0)