Skip to content

Commit 174df6f

Browse files
Merge remote-tracking branch 'origin/master' into ignite-26584
2 parents 3adc19a + 098ddec commit 174df6f

File tree

22 files changed

+538
-19
lines changed

22 files changed

+538
-19
lines changed

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteCommunicationMessageSerializationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

20-
import org.apache.ignite.internal.managers.communication.AbstractCommunicationMessageSerializationTest;
20+
import org.apache.ignite.internal.managers.communication.AbstractMessageSerializationTest;
2121
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
2222

2323
/** */
24-
public class CalciteCommunicationMessageSerializationTest extends AbstractCommunicationMessageSerializationTest {
24+
public class CalciteCommunicationMessageSerializationTest extends AbstractMessageSerializationTest {
2525
/** {@inheritDoc} */
2626
@Override protected MessageFactoryProvider messageFactory() {
2727
return new CalciteMessageFactory();

modules/compress/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
</dependency>
6868

6969
<dependency>
70-
<groupId>org.lz4</groupId>
70+
<groupId>at.yawk.lz4</groupId>
7171
<artifactId>lz4-java</artifactId>
7272
<version>${lz4.version}</version>
7373
</dependency>

modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ public final class IgniteNodeAttributes {
150150
/** V2 security subject for authenticated node. */
151151
public static final String ATTR_SECURITY_SUBJECT_V2 = ATTR_PREFIX + ".security.subject.v2";
152152

153+
/** Node certificates the connection was established with. */
154+
public static final String ATTR_NODE_CERTIFICATES = ATTR_PREFIX + ".security.certificates";
155+
153156
/** Client mode flag. */
154157
public static final String ATTR_CLIENT_MODE = ATTR_PREFIX + ".cache.client";
155158

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
7272
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
7373
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
74+
import org.apache.ignite.internal.util.typedef.F;
7475
import org.apache.ignite.internal.util.typedef.T3;
7576
import org.apache.ignite.internal.util.typedef.X;
7677
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -1389,7 +1390,7 @@ private QueryCursor<Cache.Entry<K, V>> sqlQuery(SqlQuery qry) {
13891390
keepBinary,
13901391
marsh,
13911392
cacheId,
1392-
qry.getPartitions().length >= 1 ? qry.getPartitions()[0] : -1
1393+
!F.isEmpty(qry.getPartitions()) ? qry.getPartitions()[0] : -1
13931394
));
13941395
}
13951396

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
package org.apache.ignite.internal.managers.discovery;
1919

2020
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
21+
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
22+
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
2123
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
2224
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
2325
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2426
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
2527
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
28+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
29+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
2630
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
2731
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
2832

@@ -33,5 +37,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
3337
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
3438
factory.register((short)1, TcpDiscoveryPingRequest::new, new TcpDiscoveryPingRequestSerializer());
3539
factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer());
40+
factory.register((short)3, TcpDiscoveryClientPingRequest::new, new TcpDiscoveryClientPingRequestSerializer());
41+
factory.register((short)4, TcpDiscoveryClientPingResponse::new, new TcpDiscoveryClientPingResponseSerializer());
3642
}
3743
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,7 +1577,7 @@ private void finishIncrementalSnapshotRestore(UUID reqId, Map<UUID, Boolean> res
15771577
* @param metas Map of snapshot metadata distribution across the cluster.
15781578
* @return Map of cache partitions per each node.
15791579
*/
1580-
private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
1580+
private Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
15811581
Map<UUID, List<SnapshotMetadata>> metas,
15821582
BiPredicate<Integer, Integer> filter
15831583
) {
@@ -1586,10 +1586,23 @@ private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
15861586
List<UUID> nodes = new ArrayList<>(metas.keySet());
15871587
Collections.shuffle(nodes);
15881588

1589-
Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
1590-
nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
1589+
Map<UUID, List<SnapshotMetadata>> orderedMetas = new LinkedHashMap<>();
15911590

1592-
for (Map.Entry<UUID, List<SnapshotMetadata>> e : shuffleMetas.entrySet()) {
1591+
String locDc = ctx.discovery().localNode().dataCenterId();
1592+
1593+
if (locDc != null) {
1594+
List<UUID> sameDcNodes = nodes.stream()
1595+
.map(uuid -> ctx.discovery().node(uuid))
1596+
.filter(node -> Objects.equals(node.dataCenterId(), locDc))
1597+
.map(ClusterNode::id)
1598+
.collect(Collectors.toList());
1599+
1600+
sameDcNodes.forEach(k -> orderedMetas.put(k, metas.get(k))); // Getting same DC files first.
1601+
}
1602+
1603+
nodes.forEach(k -> orderedMetas.put(k, metas.get(k)));
1604+
1605+
for (Map.Entry<UUID, List<SnapshotMetadata>> e : orderedMetas.entrySet()) {
15931606
UUID nodeId = e.getKey();
15941607

15951608
for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(Collections.emptyList())) {

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2301,6 +2301,9 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
23012301
}
23022302

23032303
locNode.setAttributes(msg.clientNodeAttributes());
2304+
2305+
clearNodeSensitiveData(locNode);
2306+
23042307
locNode.visible(true);
23052308

23062309
long topVer = msg.topologyVersion();
@@ -2356,6 +2359,8 @@ else if (log.isDebugEnabled())
23562359
assert topVer > 0 : msg;
23572360

23582361
if (!node.visible()) {
2362+
clearNodeSensitiveData(node);
2363+
23592364
node.order(topVer);
23602365
node.visible(true);
23612366

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@
178178
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER;
179179
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
180180
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
181+
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CERTIFICATES;
181182
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.CONN_DISABLED_BY_ADMIN_ERR_MSG;
182183
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.newConnectionEnabledProperty;
183184
import static org.apache.ignite.internal.processors.security.SecurityUtils.authenticateLocalNode;
@@ -1134,7 +1135,7 @@ private void joinTopology() throws IgniteSpiException {
11341135
leavingNodes.clear();
11351136
failedNodesMsgSent.clear();
11361137

1137-
locNode.attributes().remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
1138+
clearNodeSensitiveData(locNode);
11381139

11391140
locNode.order(1);
11401141
locNode.internalOrder(1);
@@ -2443,6 +2444,18 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
24432444
}
24442445
}
24452446

2447+
/** */
2448+
private static void enrichNodeWithAttribute(TcpDiscoveryNode node, String attrName, @Nullable Object attrVal) {
2449+
if (attrVal == null)
2450+
return;
2451+
2452+
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
2453+
2454+
attrs.put(attrName, attrVal);
2455+
2456+
node.setAttributes(attrs);
2457+
}
2458+
24462459
/** */
24472460
private static WorkersRegistry getWorkerRegistry(TcpDiscoverySpi spi) {
24482461
return spi.ignite() instanceof IgniteEx ? ((IgniteEx)spi.ignite()).context().workersRegistry() : null;
@@ -5292,6 +5305,8 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
52925305
if (msg.verified()) {
52935306
assert topVer > 0 : "Invalid topology version: " + msg;
52945307

5308+
clearNodeSensitiveData(node);
5309+
52955310
if (node.order() == 0)
52965311
node.order(topVer);
52975312

@@ -7066,6 +7081,11 @@ else if (e.hasCause(ObjectStreamException.class) ||
70667081
else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
70677082
TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
70687083

7084+
// Current node holds connection with the node that is joining the cluster. Therefore, it can
7085+
// save certificates with which the connection was established to joining node attributes.
7086+
if (spi.nodeAuth != null && nodeId.equals(req.node().id()))
7087+
enrichNodeWithAttribute(req.node(), ATTR_NODE_CERTIFICATES, ses.extractCertificates());
7088+
70697089
if (!req.responded()) {
70707090
boolean ok = processJoinRequestMessage(req, clientMsgWrk);
70717091

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.ArrayList;
2626
import java.util.Collection;
2727
import java.util.Collections;
28+
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.UUID;
@@ -53,6 +54,8 @@
5354

5455
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN;
5556
import static org.apache.ignite.IgniteSystemProperties.getInteger;
57+
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CERTIFICATES;
58+
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS;
5659
import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_DISCOVERY_METRICS_QNT_WARN;
5760

5861
/**
@@ -401,6 +404,16 @@ protected boolean checkAckTimeout(long ackTimeout) {
401404
return true;
402405
}
403406

407+
/** */
408+
protected void clearNodeSensitiveData(TcpDiscoveryNode node) {
409+
Map<String, Object> attrs = new HashMap<>(node.attributes());
410+
411+
attrs.remove(ATTR_NODE_CERTIFICATES);
412+
attrs.remove(ATTR_SECURITY_CREDENTIALS);
413+
414+
node.setAttributes(attrs);
415+
}
416+
404417
/** */
405418
public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
406419
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import java.io.StreamCorruptedException;
2828
import java.net.Socket;
2929
import java.nio.ByteBuffer;
30+
import java.security.cert.Certificate;
31+
import javax.net.ssl.SSLPeerUnverifiedException;
32+
import javax.net.ssl.SSLSocket;
3033
import org.apache.ignite.IgniteCheckedException;
3134
import org.apache.ignite.IgniteException;
3235
import org.apache.ignite.internal.direct.DirectMessageReader;
@@ -36,6 +39,7 @@
3639
import org.apache.ignite.plugin.extensions.communication.Message;
3740
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
3841
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
42+
import org.jetbrains.annotations.Nullable;
3943

4044
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
4145

@@ -206,6 +210,21 @@ <T> T readMessage() throws IgniteCheckedException, IOException {
206210
}
207211
}
208212

213+
/** @return SSL certificate this session is established with. {@code null} if SSL is disabled or certificate validation failed. */
214+
@Nullable Certificate[] extractCertificates() {
215+
if (!spi.isSslEnabled())
216+
return null;
217+
218+
try {
219+
return ((SSLSocket)sock).getSession().getPeerCertificates();
220+
}
221+
catch (SSLPeerUnverifiedException e) {
222+
U.error(spi.log, "Failed to extract discovery IO session certificates", e);
223+
224+
return null;
225+
}
226+
}
227+
209228
/**
210229
* Serializes a discovery message into a byte array.
211230
*

0 commit comments

Comments
 (0)