Skip to content

Commit ed020d7

Browse files
authored
IGNITE-27292 Use MessageSerializer for TcpDiscoveryConnectionCheckMessage (#12581)
1 parent 198d433 commit ed020d7

File tree

14 files changed

+168
-42
lines changed

14 files changed

+168
-42
lines changed

modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,11 @@ else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
388388

389389
imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType");
390390

391-
returnFalseIfWriteFailed(write, "writer.writeCollection", getExpr,
391+
String collectionWriter = assignableFrom(erasedType(type), type(Set.class.getName()))
392+
? "writer.writeSet"
393+
: "writer.writeCollection";
394+
395+
returnFalseIfWriteFailed(write, collectionWriter, getExpr,
392396
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)));
393397
}
394398

@@ -526,7 +530,11 @@ else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
526530

527531
assert typeArgs.size() == 1;
528532

529-
returnFalseIfReadFailed(name, "reader.readCollection",
533+
String collectionReader = assignableFrom(erasedType(type), type(Set.class.getName()))
534+
? "reader.readSet"
535+
: "reader.readCollection";
536+
537+
returnFalseIfReadFailed(name, collectionReader,
530538
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)));
531539
}
532540

modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
package org.apache.ignite.internal;
1919

2020
import java.util.ArrayList;
21-
import java.util.Collection;
2221
import java.util.LinkedHashMap;
2322
import java.util.LinkedHashSet;
2423
import java.util.List;
2524
import java.util.Map;
25+
import java.util.Set;
2626
import java.util.UUID;
2727
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
2828
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -43,7 +43,7 @@ public class IgniteDiagnosticRequest implements Message {
4343

4444
/** Infos to send to a remote node. */
4545
@Order(2)
46-
private @Nullable LinkedHashSet<DiagnosticBaseInfo> infos;
46+
private @Nullable Set<DiagnosticBaseInfo> infos;
4747

4848
/** Local message related to remote info. */
4949
private final Map<Object, List<String>> msgs = new LinkedHashMap<>();
@@ -71,12 +71,12 @@ public IgniteDiagnosticRequest() {
7171
* @param nodeId Node ID.
7272
* @param infos Diagnostic infos.
7373
*/
74-
public IgniteDiagnosticRequest(long futId, UUID nodeId, @Nullable Collection<DiagnosticBaseInfo> infos) {
74+
public IgniteDiagnosticRequest(long futId, UUID nodeId, @Nullable Set<DiagnosticBaseInfo> infos) {
7575
this(nodeId);
7676

7777
this.futId = futId;
7878

79-
infos(infos);
79+
this.infos = infos;
8080
}
8181

8282
/**
@@ -142,21 +142,13 @@ public void futureId(long futId) {
142142
}
143143

144144
/** @return Compound diagnostic infos. */
145-
public @Nullable Collection<DiagnosticBaseInfo> infos() {
145+
public @Nullable Set<DiagnosticBaseInfo> infos() {
146146
return infos;
147147
}
148148

149149
/** Sets compound diagnostic infos. */
150-
public void infos(@Nullable Collection<DiagnosticBaseInfo> infos) {
151-
// Deserialization supports only `Collection` interface in MessageReader#readCollection.
152-
this.infos = toLinkedHashSet(infos);
153-
}
154-
155-
/** */
156-
private static @Nullable LinkedHashSet<DiagnosticBaseInfo> toLinkedHashSet(@Nullable Collection<DiagnosticBaseInfo> infos) {
157-
return infos == null
158-
? null
159-
: infos instanceof LinkedHashSet ? (LinkedHashSet<DiagnosticBaseInfo>)infos : new LinkedHashSet<>(infos);
150+
public void infos(@Nullable Set<DiagnosticBaseInfo> infos) {
151+
this.infos = infos;
160152
}
161153

162154
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.BitSet;
2222
import java.util.Collection;
2323
import java.util.Map;
24+
import java.util.Set;
2425
import java.util.UUID;
2526
import org.apache.ignite.internal.direct.state.DirectMessageState;
2627
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
@@ -372,13 +373,24 @@ public ByteBuffer getBuffer() {
372373
@Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType) {
373374
DirectByteBufferStream stream = state.item().stream;
374375

375-
C col = stream.readCollection(itemType, this);
376+
C col = stream.readList(itemType, this);
376377

377378
lastRead = stream.lastFinished();
378379

379380
return col;
380381
}
381382

383+
/** {@inheritDoc} */
384+
@Override public <SET extends Set<?>> SET readSet(MessageCollectionItemType itemType) {
385+
DirectByteBufferStream stream = state.item().stream;
386+
387+
SET set = stream.readSet(itemType, this);
388+
389+
lastRead = stream.lastFinished();
390+
391+
return set;
392+
}
393+
382394
/** {@inheritDoc} */
383395
@Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
384396
MessageCollectionItemType valType, boolean linked) {

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.BitSet;
2222
import java.util.Collection;
2323
import java.util.Map;
24+
import java.util.Set;
2425
import java.util.UUID;
2526
import org.apache.ignite.internal.direct.state.DirectMessageState;
2627
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
@@ -345,6 +346,11 @@ public ByteBuffer getBuffer() {
345346
return stream.lastFinished();
346347
}
347348

349+
/** {@inheritDoc} */
350+
@Override public <T> boolean writeSet(Set<T> set, MessageCollectionItemType itemType) {
351+
return writeCollection(set, itemType);
352+
}
353+
348354
/** {@inheritDoc} */
349355
@Override public <K, V> boolean writeMap(Map<K, V> map, MessageCollectionItemType keyType,
350356
MessageCollectionItemType valType) {

modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import java.util.ArrayList;
2323
import java.util.BitSet;
2424
import java.util.Collection;
25+
import java.util.HashSet;
2526
import java.util.Iterator;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.RandomAccess;
30+
import java.util.Set;
2931
import java.util.UUID;
3032
import org.apache.ignite.IgniteCheckedException;
3133
import org.apache.ignite.IgniteException;
@@ -1601,11 +1603,36 @@ public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> item
16011603
}
16021604

16031605
/**
1606+
* Reads collection as an {@link ArrayList}.
1607+
*
1608+
* @param itemType Item type.
1609+
* @param reader Reader.
1610+
* @return {@link ArrayList}.
1611+
*/
1612+
public <L extends List<?>> L readList(MessageCollectionItemType itemType, MessageReader reader) {
1613+
return readCollection(itemType, reader, false);
1614+
}
1615+
1616+
/**
1617+
* Reads collection as a {@link HashSet}.
1618+
*
1619+
* @param itemType Item type.
1620+
* @param reader Reader.
1621+
* @return {@link HashSet}.
1622+
*/
1623+
public <SET extends Set<?>> SET readSet(MessageCollectionItemType itemType, MessageReader reader) {
1624+
return readCollection(itemType, reader, true);
1625+
}
1626+
1627+
/**
1628+
* Reads collection eather as a {@link ArrayList} or a {@link HashSet}.
1629+
*
16041630
* @param itemType Item type.
16051631
* @param reader Reader.
1606-
* @return Collection.
1632+
* @param set Read-as-Set flag.
1633+
* @return {@link ArrayList} or a {@link HashSet}.
16071634
*/
1608-
public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader) {
1635+
private <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader, boolean set) {
16091636
if (readSize == -1) {
16101637
int size = readInt();
16111638

@@ -1617,7 +1644,7 @@ public <C extends Collection<?>> C readCollection(MessageCollectionItemType item
16171644

16181645
if (readSize >= 0) {
16191646
if (col == null)
1620-
col = new ArrayList<>(readSize);
1647+
col = set ? U.newHashSet(readSize) : new ArrayList<>(readSize);
16211648

16221649
for (int i = readItems; i < readSize; i++) {
16231650
Object item = read(itemType, reader);

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
2121
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
2222
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
23+
import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
2324
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
2425
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
2526
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
@@ -28,6 +29,7 @@
2829
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
2930
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
3031
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
32+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
3133
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
3234
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
3335
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -42,5 +44,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
4244
factory.register((short)3, TcpDiscoveryClientPingRequest::new, new TcpDiscoveryClientPingRequestSerializer());
4345
factory.register((short)4, TcpDiscoveryClientPingResponse::new, new TcpDiscoveryClientPingResponseSerializer());
4446
factory.register((short)5, TcpDiscoveryLoopbackProblemMessage::new, new TcpDiscoveryLoopbackProblemMessageSerializer());
47+
factory.register((short)6, TcpDiscoveryConnectionCheckMessage::new, new TcpDiscoveryConnectionCheckMessageSerializer());
4548
}
4649
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collection;
2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.Set;
2425
import java.util.Timer;
2526
import java.util.UUID;
2627
import java.util.concurrent.ConcurrentHashMap;
@@ -876,7 +877,7 @@ public IgniteInternalFuture<String> requestDiagnosticInfo(final UUID nodeId, Ign
876877
*/
877878
private IgniteInternalFuture<String> sendDiagnosticMessage(
878879
UUID nodeId,
879-
@Nullable Collection<IgniteDiagnosticRequest.DiagnosticBaseInfo> infos
880+
@Nullable Set<IgniteDiagnosticRequest.DiagnosticBaseInfo> infos
880881
) {
881882
try {
882883
IgniteDiagnosticRequest msg = new IgniteDiagnosticRequest(diagFutId.getAndIncrement(), nodeId, infos);

modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Collection;
2323
import java.util.LinkedHashMap;
2424
import java.util.Map;
25+
import java.util.Set;
2526
import java.util.UUID;
2627
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
2728
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -226,27 +227,36 @@ public default void setBuffer(ByteBuffer buf) {
226227
*
227228
* @param itemType Array component type.
228229
* @param itemCls Array component class.
229-
* @param <T> Type of the red object .
230+
* @param <T> Type of the read object.
230231
* @return Array of objects.
231232
*/
232233
public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls);
233234

234235
/**
235-
* Reads collection.
236+
* Reads any collection.
236237
*
237238
* @param itemType Collection item type.
238-
* @param <C> Type of the red collection.
239+
* @param <C> Type of the read collection.
239240
* @return Collection.
240241
*/
241242
public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType);
242243

244+
/**
245+
* Reads any collection and provides it as a set.
246+
*
247+
* @param itemType Set item type.
248+
* @param <S> Type of the read set.
249+
* @return Set.
250+
*/
251+
public <S extends Set<?>> S readSet(MessageCollectionItemType itemType);
252+
243253
/**
244254
* Reads map.
245255
*
246256
* @param keyType Map key type.
247257
* @param valType Map value type.
248258
* @param linked Whether {@link LinkedHashMap} should be created.
249-
* @param <M> Type of the red map.
259+
* @param <M> Type of the read map.
250260
* @return Map.
251261
*/
252262
// TODO: IGNITE-26329 — switch to the new readMap method without the flag parameter

modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.BitSet;
2222
import java.util.Collection;
2323
import java.util.Map;
24+
import java.util.Set;
2425
import java.util.UUID;
2526
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
2627
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -283,7 +284,7 @@ public default void setBuffer(ByteBuffer buf) {
283284
public <T> boolean writeObjectArray(T[] arr, MessageCollectionItemType itemType);
284285

285286
/**
286-
* Writes collection.
287+
* Writes collection with its elements order.
287288
*
288289
* @param col Collection.
289290
* @param itemType Collection item type.
@@ -292,6 +293,16 @@ public default void setBuffer(ByteBuffer buf) {
292293
*/
293294
public <T> boolean writeCollection(Collection<T> col, MessageCollectionItemType itemType);
294295

296+
/**
297+
* Writes set with its elements order.
298+
*
299+
* @param set Set.
300+
* @param itemType Set item type.
301+
* @param <T> Type of the objects that set contains.
302+
* @return Whether value was fully written.
303+
*/
304+
public <T> boolean writeSet(Set<T> set, MessageCollectionItemType itemType);
305+
295306
/**
296307
* Writes map.
297308
*

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.Externalizable;
2121
import java.io.Serializable;
22-
import java.util.Collection;
2322
import java.util.HashSet;
2423
import java.util.Set;
2524
import java.util.UUID;
@@ -300,7 +299,7 @@ public void failedNodes(@Nullable Set<UUID> failedNodes) {
300299
/**
301300
* @return Failed nodes IDs.
302301
*/
303-
@Nullable public Collection<UUID> failedNodes() {
302+
@Nullable public Set<UUID> failedNodes() {
304303
return failedNodes;
305304
}
306305

0 commit comments

Comments
 (0)