diff --git a/driver-core/src/test/java/com/datastax/driver/core/SchemaChangesCCTest.java b/driver-core/src/test/java/com/datastax/driver/core/SchemaChangesCCTest.java index 0538cf87e07..d57301bb7ee 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/SchemaChangesCCTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/SchemaChangesCCTest.java @@ -33,7 +33,7 @@ import org.mockito.ArgumentCaptor; import org.testng.annotations.Test; -@CCMConfig(numberOfNodes = 2, dirtiesContext = true, createCluster = false) +@CCMConfig(numberOfNodes = 3, dirtiesContext = true, createCluster = false) public class SchemaChangesCCTest extends CCMTestsSupport { private static final int NOTIF_TIMEOUT_MS = 5000; diff --git a/driver-core/src/test/java/com/datastax/driver/core/SessionLeakTest.java b/driver-core/src/test/java/com/datastax/driver/core/SessionLeakTest.java index aa0f5737954..c9f9fa569f2 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/SessionLeakTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/SessionLeakTest.java @@ -50,7 +50,12 @@ public void connectionLeakTest() throws Exception { assertOpenConnections(1, cluster); // ensure sessions.size() returns with 1 control connection + core pool size. - int corePoolSize = TestUtils.numberOfLocalCoreConnections(cluster); + int corePoolSize; + if (ccm().getScyllaVersion() != null) { + corePoolSize = TestUtils.numberOfLocalCoreConnectionsSharded(cluster); + } else { + corePoolSize = TestUtils.numberOfLocalCoreConnections(cluster); + } Session session = cluster.connect(); assertThat(cluster.manager.sessions.size()).isEqualTo(1); @@ -114,13 +119,21 @@ public void should_not_leak_session_when_wrong_keyspace() throws Exception { // Ensure no channels remain open. channelMonitor.stop(); channelMonitor.report(); - assertThat(channelMonitor.openChannels(ccm().addressOfNode(1), ccm().addressOfNode(2)).size()) + assertThat( + channelMonitor + .openChannelsPortAgnostic( + ccm().addressOfNode(1).getAddress(), ccm().addressOfNode(2).getAddress()) + .size()) .isEqualTo(0); } private void assertOpenConnections(int expected, Cluster cluster) { assertThat(cluster.getMetrics().getOpenConnections().getValue()).isEqualTo(expected); - assertThat(channelMonitor.openChannels(ccm().addressOfNode(1), ccm().addressOfNode(2)).size()) + assertThat( + channelMonitor + .openChannelsPortAgnostic( + ccm().addressOfNode(1).getAddress(), ccm().addressOfNode(2).getAddress()) + .size()) .isEqualTo(expected); } } diff --git a/driver-core/src/test/java/com/datastax/driver/core/utils/SocketChannelMonitor.java b/driver-core/src/test/java/com/datastax/driver/core/utils/SocketChannelMonitor.java index 07977f58d6b..f777dd43e89 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/utils/SocketChannelMonitor.java +++ b/driver-core/src/test/java/com/datastax/driver/core/utils/SocketChannelMonitor.java @@ -26,6 +26,7 @@ import io.netty.channel.socket.SocketChannel; import java.io.Closeable; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; @@ -192,6 +193,31 @@ public boolean apply(SocketChannel input) { return channels; } + public Collection openChannelsPortAgnostic(InetAddress... addresses) { + return openChannelsPortAgnostic(Arrays.asList(addresses)); + } + + /** + * @param addresses The InetAddresses to include. The port is ignored in this case. + * @return Open channels matching the given InetAddresses. + */ + public Collection openChannelsPortAgnostic( + final Collection addresses) { + List channels = + Lists.newArrayList( + matchingChannels( + new Predicate() { + @Override + public boolean apply(SocketChannel input) { + return input.isOpen() + && input.remoteAddress() != null + && addresses.contains(input.remoteAddress().getAddress()); + } + })); + Collections.sort(channels, BY_REMOTE_ADDRESS); + return channels; + } + /** * @param channelFilter {@link Predicate} to use to determine whether or not a socket shall be * considered.