3838import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
3939import com .datastax .oss .driver .api .core .config .DriverConfig ;
4040import com .datastax .oss .driver .api .core .config .DriverExecutionProfile ;
41+ import com .datastax .oss .driver .api .core .ssl .SslEngineFactory ;
4142import com .datastax .oss .driver .internal .core .addresstranslation .PassThroughAddressTranslator ;
4243import com .datastax .oss .driver .internal .core .adminrequest .AdminResult ;
4344import com .datastax .oss .driver .internal .core .adminrequest .AdminRow ;
5051import com .datastax .oss .driver .shaded .guava .common .collect .ImmutableMap ;
5152import com .datastax .oss .driver .shaded .guava .common .collect .ImmutableSet ;
5253import com .datastax .oss .driver .shaded .guava .common .collect .Iterators ;
54+ import com .datastax .oss .driver .shaded .guava .common .collect .Maps ;
5355import com .datastax .oss .protocol .internal .Message ;
5456import com .datastax .oss .protocol .internal .ProtocolConstants ;
5557import com .datastax .oss .protocol .internal .response .Error ;
58+ import com .google .common .collect .Streams ;
5659import com .tngtech .java .junit .dataprovider .DataProvider ;
5760import com .tngtech .java .junit .dataprovider .DataProviderRunner ;
5861import com .tngtech .java .junit .dataprovider .UseDataProvider ;
@@ -95,6 +98,8 @@ public class DefaultTopologyMonitorTest {
9598 @ Mock private Appender <ILoggingEvent > appender ;
9699 @ Captor private ArgumentCaptor <ILoggingEvent > loggingEventCaptor ;
97100
101+ @ Mock private SslEngineFactory sslEngineFactory ;
102+
98103 private DefaultNode node1 ;
99104 private DefaultNode node2 ;
100105
@@ -414,18 +419,6 @@ public void should_skip_invalid_peers_row_v2(String columnToCheck) {
414419 + "This is likely a gossip or snitch issue, this node will be ignored." );
415420 }
416421
417- @ DataProvider
418- public static Object [][] columnsToCheckV1 () {
419- return new Object [][] {{"rpc_address" }, {"host_id" }, {"data_center" }, {"rack" }, {"tokens" }};
420- }
421-
422- @ DataProvider
423- public static Object [][] columnsToCheckV2 () {
424- return new Object [][] {
425- {"native_address" }, {"native_port" }, {"host_id" }, {"data_center" }, {"rack" }, {"tokens" }
426- };
427- }
428-
429422 @ Test
430423 public void should_stop_executing_queries_once_closed () {
431424 // Given
@@ -443,9 +436,9 @@ public void should_stop_executing_queries_once_closed() {
443436 public void should_warn_when_control_host_found_in_system_peers () {
444437 // Given
445438 AdminRow local = mockLocalRow (1 , node1 .getHostId ());
446- AdminRow peer3 = mockPeersRow (3 , UUID .randomUUID ());
447- AdminRow peer2 = mockPeersRow (2 , node2 .getHostId ());
448439 AdminRow peer1 = mockPeersRow (1 , node2 .getHostId ()); // invalid
440+ AdminRow peer2 = mockPeersRow (2 , node2 .getHostId ());
441+ AdminRow peer3 = mockPeersRow (3 , UUID .randomUUID ());
449442 topologyMonitor .stubQueries (
450443 new StubbedQuery ("SELECT * FROM system.local WHERE key='local'" , mockResult (local )),
451444 new StubbedQuery ("SELECT * FROM system.peers_v2" , Collections .emptyMap (), null , true ),
@@ -462,7 +455,7 @@ public void should_warn_when_control_host_found_in_system_peers() {
462455 .hasSize (3 )
463456 .extractingResultOf ("getEndPoint" )
464457 .containsOnlyOnce (node1 .getEndPoint ()));
465- assertLog (
458+ assertLogContains (
466459 Level .WARN ,
467460 "[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers: "
468461 + "this entry will be ignored. This is likely due to a misconfiguration; "
@@ -492,14 +485,124 @@ public void should_warn_when_control_host_found_in_system_peers_v2() {
492485 .hasSize (3 )
493486 .extractingResultOf ("getEndPoint" )
494487 .containsOnlyOnce (node1 .getEndPoint ()));
495- assertLog (
488+ assertLogContains (
496489 Level .WARN ,
497490 "[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers_v2: "
498491 + "this entry will be ignored. This is likely due to a misconfiguration; "
499492 + "please verify your rpc_address configuration in cassandra.yaml on "
500493 + "all nodes in your cluster." );
501494 }
502495
496+ // Confirm the base case of extracting peer info from peers_v2, no SSL involved
497+ @ Test
498+ public void should_get_peer_address_info_peers_v2 () {
499+ // Given
500+ AdminRow local = mockLocalRow (1 , node1 .getHostId ());
501+ AdminRow peer2 = mockPeersV2Row (3 , node2 .getHostId ());
502+ AdminRow peer1 = mockPeersV2Row (2 , node1 .getHostId ());
503+ topologyMonitor .isSchemaV2 = true ;
504+ topologyMonitor .stubQueries (
505+ new StubbedQuery ("SELECT * FROM system.local WHERE key='local'" , mockResult (local )),
506+ new StubbedQuery ("SELECT * FROM system.peers_v2" , mockResult (peer2 , peer1 )));
507+ when (context .getSslEngineFactory ()).thenReturn (Optional .empty ());
508+
509+ // When
510+ CompletionStage <Iterable <NodeInfo >> futureInfos = topologyMonitor .refreshNodeList ();
511+
512+ // Then
513+ assertThatStage (futureInfos )
514+ .isSuccess (
515+ infos -> {
516+ Iterator <NodeInfo > iterator = infos .iterator ();
517+ // First NodeInfo is for local, skip past that
518+ iterator .next ();
519+ NodeInfo peer2nodeInfo = iterator .next ();
520+ assertThat (peer2nodeInfo .getEndPoint ().resolve ())
521+ .isEqualTo (new InetSocketAddress ("127.0.0.3" , 9042 ));
522+ NodeInfo peer1nodeInfo = iterator .next ();
523+ assertThat (peer1nodeInfo .getEndPoint ().resolve ())
524+ .isEqualTo (new InetSocketAddress ("127.0.0.2" , 9042 ));
525+ });
526+ }
527+
528+ // Confirm the base case of extracting peer info from DSE peers table, no SSL involved
529+ @ Test
530+ public void should_get_peer_address_info_peers_dse () {
531+ // Given
532+ AdminRow local = mockLocalRow (1 , node1 .getHostId ());
533+ AdminRow peer2 = mockPeersRowDse (3 , node2 .getHostId ());
534+ AdminRow peer1 = mockPeersRowDse (2 , node1 .getHostId ());
535+ topologyMonitor .isSchemaV2 = true ;
536+ topologyMonitor .stubQueries (
537+ new StubbedQuery ("SELECT * FROM system.local WHERE key='local'" , mockResult (local )),
538+ new StubbedQuery ("SELECT * FROM system.peers_v2" , Maps .newHashMap (), null , true ),
539+ new StubbedQuery ("SELECT * FROM system.peers" , mockResult (peer2 , peer1 )));
540+ when (context .getSslEngineFactory ()).thenReturn (Optional .empty ());
541+
542+ // When
543+ CompletionStage <Iterable <NodeInfo >> futureInfos = topologyMonitor .refreshNodeList ();
544+
545+ // Then
546+ assertThatStage (futureInfos )
547+ .isSuccess (
548+ infos -> {
549+ Iterator <NodeInfo > iterator = infos .iterator ();
550+ // First NodeInfo is for local, skip past that
551+ iterator .next ();
552+ NodeInfo peer2nodeInfo = iterator .next ();
553+ assertThat (peer2nodeInfo .getEndPoint ().resolve ())
554+ .isEqualTo (new InetSocketAddress ("127.0.0.3" , 9042 ));
555+ NodeInfo peer1nodeInfo = iterator .next ();
556+ assertThat (peer1nodeInfo .getEndPoint ().resolve ())
557+ .isEqualTo (new InetSocketAddress ("127.0.0.2" , 9042 ));
558+ });
559+ }
560+
561+ // Confirm the base case of extracting peer info from DSE peers table, this time with SSL
562+ @ Test
563+ public void should_get_peer_address_info_peers_dse_with_ssl () {
564+ // Given
565+ AdminRow local = mockLocalRow (1 , node1 .getHostId ());
566+ AdminRow peer2 = mockPeersRowDseWithSsl (3 , node2 .getHostId ());
567+ AdminRow peer1 = mockPeersRowDseWithSsl (2 , node1 .getHostId ());
568+ topologyMonitor .isSchemaV2 = true ;
569+ topologyMonitor .stubQueries (
570+ new StubbedQuery ("SELECT * FROM system.local WHERE key='local'" , mockResult (local )),
571+ new StubbedQuery ("SELECT * FROM system.peers_v2" , Maps .newHashMap (), null , true ),
572+ new StubbedQuery ("SELECT * FROM system.peers" , mockResult (peer2 , peer1 )));
573+ when (context .getSslEngineFactory ()).thenReturn (Optional .of (sslEngineFactory ));
574+
575+ // When
576+ CompletionStage <Iterable <NodeInfo >> futureInfos = topologyMonitor .refreshNodeList ();
577+
578+ // Then
579+ assertThatStage (futureInfos )
580+ .isSuccess (
581+ infos -> {
582+ Iterator <NodeInfo > iterator = infos .iterator ();
583+ // First NodeInfo is for local, skip past that
584+ iterator .next ();
585+ NodeInfo peer2nodeInfo = iterator .next ();
586+ assertThat (peer2nodeInfo .getEndPoint ().resolve ())
587+ .isEqualTo (new InetSocketAddress ("127.0.0.3" , 9043 ));
588+ NodeInfo peer1nodeInfo = iterator .next ();
589+ assertThat (peer1nodeInfo .getEndPoint ().resolve ())
590+ .isEqualTo (new InetSocketAddress ("127.0.0.2" , 9043 ));
591+ });
592+ }
593+
594+ @ DataProvider
595+ public static Object [][] columnsToCheckV1 () {
596+ return new Object [][] {{"rpc_address" }, {"host_id" }, {"data_center" }, {"rack" }, {"tokens" }};
597+ }
598+
599+ @ DataProvider
600+ public static Object [][] columnsToCheckV2 () {
601+ return new Object [][] {
602+ {"native_address" }, {"native_port" }, {"host_id" }, {"data_center" }, {"rack" }, {"tokens" }
603+ };
604+ }
605+
503606 /** Mocks the query execution logic. */
504607 private static class TestTopologyMonitor extends DefaultTopologyMonitor {
505608
@@ -641,6 +744,43 @@ private AdminRow mockPeersV2Row(int i, UUID hostId) {
641744 }
642745 }
643746
747+ // Mock row for DSE ~6.8
748+ private AdminRow mockPeersRowDse (int i , UUID hostId ) {
749+ try {
750+ AdminRow row = mock (AdminRow .class );
751+ when (row .contains ("peer" )).thenReturn (true );
752+ when (row .isNull ("data_center" )).thenReturn (false );
753+ when (row .getString ("data_center" )).thenReturn ("dc" + i );
754+ when (row .getString ("dse_version" )).thenReturn ("6.8.30" );
755+ when (row .contains ("graph" )).thenReturn (true );
756+ when (row .isNull ("host_id" )).thenReturn (hostId == null );
757+ when (row .getUuid ("host_id" )).thenReturn (hostId );
758+ when (row .getInetAddress ("peer" )).thenReturn (InetAddress .getByName ("127.0.0." + i ));
759+ when (row .isNull ("rack" )).thenReturn (false );
760+ when (row .getString ("rack" )).thenReturn ("rack" + i );
761+ when (row .isNull ("native_transport_address" )).thenReturn (false );
762+ when (row .getInetAddress ("native_transport_address" ))
763+ .thenReturn (InetAddress .getByName ("127.0.0." + i ));
764+ when (row .isNull ("native_transport_port" )).thenReturn (false );
765+ when (row .getInteger ("native_transport_port" )).thenReturn (9042 );
766+ when (row .isNull ("tokens" )).thenReturn (false );
767+ when (row .getSetOfString ("tokens" )).thenReturn (ImmutableSet .of ("token" + i ));
768+ when (row .isNull ("rpc_address" )).thenReturn (false );
769+
770+ return row ;
771+ } catch (UnknownHostException e ) {
772+ fail ("unexpected" , e );
773+ return null ;
774+ }
775+ }
776+
777+ private AdminRow mockPeersRowDseWithSsl (int i , UUID hostId ) {
778+ AdminRow row = mockPeersRowDse (i , hostId );
779+ when (row .isNull ("native_transport_port_ssl" )).thenReturn (false );
780+ when (row .getInteger ("native_transport_port_ssl" )).thenReturn (9043 );
781+ return row ;
782+ }
783+
644784 private AdminResult mockResult (AdminRow ... rows ) {
645785 AdminResult result = mock (AdminResult .class );
646786 when (result .iterator ()).thenReturn (Iterators .forArray (rows ));
@@ -654,4 +794,12 @@ private void assertLog(Level level, String message) {
654794 assertThat (logs ).hasSize (1 );
655795 assertThat (logs .iterator ().next ().getFormattedMessage ()).contains (message );
656796 }
797+
798+ private void assertLogContains (Level level , String message ) {
799+ verify (appender , atLeast (1 )).doAppend (loggingEventCaptor .capture ());
800+ Iterable <ILoggingEvent > logs =
801+ filter (loggingEventCaptor .getAllValues ()).with ("level" , level ).get ();
802+ assertThat (
803+ Streams .stream (logs ).map (ILoggingEvent ::getFormattedMessage ).anyMatch (message ::contains ));
804+ }
657805}
0 commit comments