Skip to content

Commit 7e2c657

Browse files
committed
CASSANDRA-19352: Support native_transport_(address|port) + native_transport_port_ssl for DSE 6.8 (4.x edition)
patch by absurdfarce; reviewed by absurdfarce and adutra for CASSANDRA-19352
1 parent ea2e475 commit 7e2c657

File tree

2 files changed

+223
-33
lines changed

2 files changed

+223
-33
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
3535
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
3636
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
37+
import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
3738
import com.datastax.oss.protocol.internal.ProtocolConstants;
3839
import com.datastax.oss.protocol.internal.response.Error;
3940
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -69,6 +70,10 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
6970
// Assume topology queries never need paging
7071
private static final int INFINITE_PAGE_SIZE = -1;
7172

73+
// A few system.peers columns which get special handling below
74+
private static final String NATIVE_PORT = "native_port";
75+
private static final String NATIVE_TRANSPORT_PORT = "native_transport_port";
76+
7277
private final String logPrefix;
7378
private final InternalDriverContext context;
7479
private final ControlConnection controlConnection;
@@ -494,28 +499,65 @@ private void savePort(DriverChannel channel) {
494499
@Nullable
495500
protected InetSocketAddress getBroadcastRpcAddress(
496501
@NonNull AdminRow row, @NonNull EndPoint localEndPoint) {
497-
// in system.peers or system.local
498-
InetAddress broadcastRpcInetAddress = row.getInetAddress("rpc_address");
502+
503+
InetAddress broadcastRpcInetAddress = null;
504+
Iterator<String> addrCandidates =
505+
Iterators.forArray(
506+
// in system.peers_v2 (Cassandra >= 4.0)
507+
"native_address",
508+
// DSE 6.8 introduced native_transport_address and native_transport_port for the
509+
// listen address.
510+
"native_transport_address",
511+
// in system.peers or system.local
512+
"rpc_address");
513+
514+
while (broadcastRpcInetAddress == null && addrCandidates.hasNext())
515+
broadcastRpcInetAddress = row.getInetAddress(addrCandidates.next());
516+
// This could only happen if system tables are corrupted, but handle gracefully
499517
if (broadcastRpcInetAddress == null) {
500-
// in system.peers_v2 (Cassandra >= 4.0)
501-
broadcastRpcInetAddress = row.getInetAddress("native_address");
502-
if (broadcastRpcInetAddress == null) {
503-
// This could only happen if system tables are corrupted, but handle gracefully
504-
return null;
518+
LOG.warn(
519+
"[{}] Unable to determine broadcast RPC IP address, returning null. "
520+
+ "This is likely due to a misconfiguration or invalid system tables. "
521+
+ "Please validate the contents of system.local and/or {}.",
522+
logPrefix,
523+
getPeerTableName());
524+
return null;
525+
}
526+
527+
Integer broadcastRpcPort = null;
528+
Iterator<String> portCandidates =
529+
Iterators.forArray(
530+
// in system.peers_v2 (Cassandra >= 4.0)
531+
NATIVE_PORT,
532+
// DSE 6.8 introduced native_transport_address and native_transport_port for the
533+
// listen address.
534+
NATIVE_TRANSPORT_PORT,
535+
// system.local for Cassandra >= 4.0
536+
"rpc_port");
537+
538+
while ((broadcastRpcPort == null || broadcastRpcPort == 0) && portCandidates.hasNext()) {
539+
540+
String colName = portCandidates.next();
541+
broadcastRpcPort = row.getInteger(colName);
542+
// Support override for SSL port (if enabled) in DSE
543+
if (NATIVE_TRANSPORT_PORT.equals(colName) && context.getSslEngineFactory().isPresent()) {
544+
545+
String sslColName = colName + "_ssl";
546+
broadcastRpcPort = row.getInteger(sslColName);
505547
}
506548
}
507-
// system.local for Cassandra >= 4.0
508-
Integer broadcastRpcPort = row.getInteger("rpc_port");
549+
// use the default port if no port information was found in the row;
550+
// note that in rare situations, the default port might not be known, in which case we
551+
// report zero, as advertised in the javadocs of Node and NodeInfo.
509552
if (broadcastRpcPort == null || broadcastRpcPort == 0) {
510-
// system.peers_v2
511-
broadcastRpcPort = row.getInteger("native_port");
512-
if (broadcastRpcPort == null || broadcastRpcPort == 0) {
513-
// use the default port if no port information was found in the row;
514-
// note that in rare situations, the default port might not be known, in which case we
515-
// report zero, as advertised in the javadocs of Node and NodeInfo.
516-
broadcastRpcPort = port == -1 ? 0 : port;
517-
}
553+
554+
LOG.warn(
555+
"[{}] Unable to determine broadcast RPC port. "
556+
+ "Trying to fall back to port used by the control connection.",
557+
logPrefix);
558+
broadcastRpcPort = port == -1 ? 0 : port;
518559
}
560+
519561
InetSocketAddress broadcastRpcAddress =
520562
new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
521563
if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.resolve())) {

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java

Lines changed: 164 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3939
import com.datastax.oss.driver.api.core.config.DriverConfig;
4040
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
41+
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
4142
import com.datastax.oss.driver.internal.core.addresstranslation.PassThroughAddressTranslator;
4243
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
4344
import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
@@ -50,9 +51,11 @@
5051
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
5152
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
5253
import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
54+
import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
5355
import com.datastax.oss.protocol.internal.Message;
5456
import com.datastax.oss.protocol.internal.ProtocolConstants;
5557
import com.datastax.oss.protocol.internal.response.Error;
58+
import com.google.common.collect.Streams;
5659
import com.tngtech.java.junit.dataprovider.DataProvider;
5760
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
5861
import com.tngtech.java.junit.dataprovider.UseDataProvider;
@@ -95,6 +98,8 @@ public class DefaultTopologyMonitorTest {
9598
@Mock private Appender<ILoggingEvent> appender;
9699
@Captor private ArgumentCaptor<ILoggingEvent> loggingEventCaptor;
97100

101+
@Mock private SslEngineFactory sslEngineFactory;
102+
98103
private DefaultNode node1;
99104
private DefaultNode node2;
100105

@@ -414,18 +419,6 @@ public void should_skip_invalid_peers_row_v2(String columnToCheck) {
414419
+ "This is likely a gossip or snitch issue, this node will be ignored.");
415420
}
416421

417-
@DataProvider
418-
public static Object[][] columnsToCheckV1() {
419-
return new Object[][] {{"rpc_address"}, {"host_id"}, {"data_center"}, {"rack"}, {"tokens"}};
420-
}
421-
422-
@DataProvider
423-
public static Object[][] columnsToCheckV2() {
424-
return new Object[][] {
425-
{"native_address"}, {"native_port"}, {"host_id"}, {"data_center"}, {"rack"}, {"tokens"}
426-
};
427-
}
428-
429422
@Test
430423
public void should_stop_executing_queries_once_closed() {
431424
// Given
@@ -443,9 +436,9 @@ public void should_stop_executing_queries_once_closed() {
443436
public void should_warn_when_control_host_found_in_system_peers() {
444437
// Given
445438
AdminRow local = mockLocalRow(1, node1.getHostId());
446-
AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
447-
AdminRow peer2 = mockPeersRow(2, node2.getHostId());
448439
AdminRow peer1 = mockPeersRow(1, node2.getHostId()); // invalid
440+
AdminRow peer2 = mockPeersRow(2, node2.getHostId());
441+
AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
449442
topologyMonitor.stubQueries(
450443
new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
451444
new StubbedQuery("SELECT * FROM system.peers_v2", Collections.emptyMap(), null, true),
@@ -462,7 +455,7 @@ public void should_warn_when_control_host_found_in_system_peers() {
462455
.hasSize(3)
463456
.extractingResultOf("getEndPoint")
464457
.containsOnlyOnce(node1.getEndPoint()));
465-
assertLog(
458+
assertLogContains(
466459
Level.WARN,
467460
"[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers: "
468461
+ "this entry will be ignored. This is likely due to a misconfiguration; "
@@ -492,14 +485,124 @@ public void should_warn_when_control_host_found_in_system_peers_v2() {
492485
.hasSize(3)
493486
.extractingResultOf("getEndPoint")
494487
.containsOnlyOnce(node1.getEndPoint()));
495-
assertLog(
488+
assertLogContains(
496489
Level.WARN,
497490
"[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers_v2: "
498491
+ "this entry will be ignored. This is likely due to a misconfiguration; "
499492
+ "please verify your rpc_address configuration in cassandra.yaml on "
500493
+ "all nodes in your cluster.");
501494
}
502495

496+
// Confirm the base case of extracting peer info from peers_v2, no SSL involved
497+
@Test
498+
public void should_get_peer_address_info_peers_v2() {
499+
// Given
500+
AdminRow local = mockLocalRow(1, node1.getHostId());
501+
AdminRow peer2 = mockPeersV2Row(3, node2.getHostId());
502+
AdminRow peer1 = mockPeersV2Row(2, node1.getHostId());
503+
topologyMonitor.isSchemaV2 = true;
504+
topologyMonitor.stubQueries(
505+
new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
506+
new StubbedQuery("SELECT * FROM system.peers_v2", mockResult(peer2, peer1)));
507+
when(context.getSslEngineFactory()).thenReturn(Optional.empty());
508+
509+
// When
510+
CompletionStage<Iterable<NodeInfo>> futureInfos = topologyMonitor.refreshNodeList();
511+
512+
// Then
513+
assertThatStage(futureInfos)
514+
.isSuccess(
515+
infos -> {
516+
Iterator<NodeInfo> iterator = infos.iterator();
517+
// First NodeInfo is for local, skip past that
518+
iterator.next();
519+
NodeInfo peer2nodeInfo = iterator.next();
520+
assertThat(peer2nodeInfo.getEndPoint().resolve())
521+
.isEqualTo(new InetSocketAddress("127.0.0.3", 9042));
522+
NodeInfo peer1nodeInfo = iterator.next();
523+
assertThat(peer1nodeInfo.getEndPoint().resolve())
524+
.isEqualTo(new InetSocketAddress("127.0.0.2", 9042));
525+
});
526+
}
527+
528+
// Confirm the base case of extracting peer info from DSE peers table, no SSL involved
529+
@Test
530+
public void should_get_peer_address_info_peers_dse() {
531+
// Given
532+
AdminRow local = mockLocalRow(1, node1.getHostId());
533+
AdminRow peer2 = mockPeersRowDse(3, node2.getHostId());
534+
AdminRow peer1 = mockPeersRowDse(2, node1.getHostId());
535+
topologyMonitor.isSchemaV2 = true;
536+
topologyMonitor.stubQueries(
537+
new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
538+
new StubbedQuery("SELECT * FROM system.peers_v2", Maps.newHashMap(), null, true),
539+
new StubbedQuery("SELECT * FROM system.peers", mockResult(peer2, peer1)));
540+
when(context.getSslEngineFactory()).thenReturn(Optional.empty());
541+
542+
// When
543+
CompletionStage<Iterable<NodeInfo>> futureInfos = topologyMonitor.refreshNodeList();
544+
545+
// Then
546+
assertThatStage(futureInfos)
547+
.isSuccess(
548+
infos -> {
549+
Iterator<NodeInfo> iterator = infos.iterator();
550+
// First NodeInfo is for local, skip past that
551+
iterator.next();
552+
NodeInfo peer2nodeInfo = iterator.next();
553+
assertThat(peer2nodeInfo.getEndPoint().resolve())
554+
.isEqualTo(new InetSocketAddress("127.0.0.3", 9042));
555+
NodeInfo peer1nodeInfo = iterator.next();
556+
assertThat(peer1nodeInfo.getEndPoint().resolve())
557+
.isEqualTo(new InetSocketAddress("127.0.0.2", 9042));
558+
});
559+
}
560+
561+
// Confirm the base case of extracting peer info from DSE peers table, this time with SSL
562+
@Test
563+
public void should_get_peer_address_info_peers_dse_with_ssl() {
564+
// Given
565+
AdminRow local = mockLocalRow(1, node1.getHostId());
566+
AdminRow peer2 = mockPeersRowDseWithSsl(3, node2.getHostId());
567+
AdminRow peer1 = mockPeersRowDseWithSsl(2, node1.getHostId());
568+
topologyMonitor.isSchemaV2 = true;
569+
topologyMonitor.stubQueries(
570+
new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
571+
new StubbedQuery("SELECT * FROM system.peers_v2", Maps.newHashMap(), null, true),
572+
new StubbedQuery("SELECT * FROM system.peers", mockResult(peer2, peer1)));
573+
when(context.getSslEngineFactory()).thenReturn(Optional.of(sslEngineFactory));
574+
575+
// When
576+
CompletionStage<Iterable<NodeInfo>> futureInfos = topologyMonitor.refreshNodeList();
577+
578+
// Then
579+
assertThatStage(futureInfos)
580+
.isSuccess(
581+
infos -> {
582+
Iterator<NodeInfo> iterator = infos.iterator();
583+
// First NodeInfo is for local, skip past that
584+
iterator.next();
585+
NodeInfo peer2nodeInfo = iterator.next();
586+
assertThat(peer2nodeInfo.getEndPoint().resolve())
587+
.isEqualTo(new InetSocketAddress("127.0.0.3", 9043));
588+
NodeInfo peer1nodeInfo = iterator.next();
589+
assertThat(peer1nodeInfo.getEndPoint().resolve())
590+
.isEqualTo(new InetSocketAddress("127.0.0.2", 9043));
591+
});
592+
}
593+
594+
@DataProvider
595+
public static Object[][] columnsToCheckV1() {
596+
return new Object[][] {{"rpc_address"}, {"host_id"}, {"data_center"}, {"rack"}, {"tokens"}};
597+
}
598+
599+
@DataProvider
600+
public static Object[][] columnsToCheckV2() {
601+
return new Object[][] {
602+
{"native_address"}, {"native_port"}, {"host_id"}, {"data_center"}, {"rack"}, {"tokens"}
603+
};
604+
}
605+
503606
/** Mocks the query execution logic. */
504607
private static class TestTopologyMonitor extends DefaultTopologyMonitor {
505608

@@ -641,6 +744,43 @@ private AdminRow mockPeersV2Row(int i, UUID hostId) {
641744
}
642745
}
643746

747+
// Mock row for DSE ~6.8
748+
private AdminRow mockPeersRowDse(int i, UUID hostId) {
749+
try {
750+
AdminRow row = mock(AdminRow.class);
751+
when(row.contains("peer")).thenReturn(true);
752+
when(row.isNull("data_center")).thenReturn(false);
753+
when(row.getString("data_center")).thenReturn("dc" + i);
754+
when(row.getString("dse_version")).thenReturn("6.8.30");
755+
when(row.contains("graph")).thenReturn(true);
756+
when(row.isNull("host_id")).thenReturn(hostId == null);
757+
when(row.getUuid("host_id")).thenReturn(hostId);
758+
when(row.getInetAddress("peer")).thenReturn(InetAddress.getByName("127.0.0." + i));
759+
when(row.isNull("rack")).thenReturn(false);
760+
when(row.getString("rack")).thenReturn("rack" + i);
761+
when(row.isNull("native_transport_address")).thenReturn(false);
762+
when(row.getInetAddress("native_transport_address"))
763+
.thenReturn(InetAddress.getByName("127.0.0." + i));
764+
when(row.isNull("native_transport_port")).thenReturn(false);
765+
when(row.getInteger("native_transport_port")).thenReturn(9042);
766+
when(row.isNull("tokens")).thenReturn(false);
767+
when(row.getSetOfString("tokens")).thenReturn(ImmutableSet.of("token" + i));
768+
when(row.isNull("rpc_address")).thenReturn(false);
769+
770+
return row;
771+
} catch (UnknownHostException e) {
772+
fail("unexpected", e);
773+
return null;
774+
}
775+
}
776+
777+
private AdminRow mockPeersRowDseWithSsl(int i, UUID hostId) {
778+
AdminRow row = mockPeersRowDse(i, hostId);
779+
when(row.isNull("native_transport_port_ssl")).thenReturn(false);
780+
when(row.getInteger("native_transport_port_ssl")).thenReturn(9043);
781+
return row;
782+
}
783+
644784
private AdminResult mockResult(AdminRow... rows) {
645785
AdminResult result = mock(AdminResult.class);
646786
when(result.iterator()).thenReturn(Iterators.forArray(rows));
@@ -654,4 +794,12 @@ private void assertLog(Level level, String message) {
654794
assertThat(logs).hasSize(1);
655795
assertThat(logs.iterator().next().getFormattedMessage()).contains(message);
656796
}
797+
798+
private void assertLogContains(Level level, String message) {
799+
verify(appender, atLeast(1)).doAppend(loggingEventCaptor.capture());
800+
Iterable<ILoggingEvent> logs =
801+
filter(loggingEventCaptor.getAllValues()).with("level", level).get();
802+
assertThat(
803+
Streams.stream(logs).map(ILoggingEvent::getFormattedMessage).anyMatch(message::contains));
804+
}
657805
}

0 commit comments

Comments
 (0)