From 77ea3a6a61598245b466f9c5f01a9bb2c268b994 Mon Sep 17 00:00:00 2001 From: Bouncheck <36934780+Bouncheck@users.noreply.github.com> Date: Thu, 26 Jun 2025 22:49:36 +0200 Subject: [PATCH 1/2] Adjust SessionLeakTest This brings two adjustments: First, it changes the expected number of connections for Scylla clusters since with multiple shards driver will usually open more connections. Second, the checks for number of open channels now ignore the target port on the cluster. Any channel matchin the address without port will now count. This helps to correctly count connections to ccm made Scylla clusters which can use different ports depending on circumstances. For example with advanced shard awareness driver will target different port than in non-shard aware case. --- .../datastax/driver/core/SessionLeakTest.java | 19 +++++++++++--- .../core/utils/SocketChannelMonitor.java | 26 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/driver-core/src/test/java/com/datastax/driver/core/SessionLeakTest.java b/driver-core/src/test/java/com/datastax/driver/core/SessionLeakTest.java index aa0f5737954..c9f9fa569f2 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/SessionLeakTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/SessionLeakTest.java @@ -50,7 +50,12 @@ public void connectionLeakTest() throws Exception { assertOpenConnections(1, cluster); // ensure sessions.size() returns with 1 control connection + core pool size. - int corePoolSize = TestUtils.numberOfLocalCoreConnections(cluster); + int corePoolSize; + if (ccm().getScyllaVersion() != null) { + corePoolSize = TestUtils.numberOfLocalCoreConnectionsSharded(cluster); + } else { + corePoolSize = TestUtils.numberOfLocalCoreConnections(cluster); + } Session session = cluster.connect(); assertThat(cluster.manager.sessions.size()).isEqualTo(1); @@ -114,13 +119,21 @@ public void should_not_leak_session_when_wrong_keyspace() throws Exception { // Ensure no channels remain open. channelMonitor.stop(); channelMonitor.report(); - assertThat(channelMonitor.openChannels(ccm().addressOfNode(1), ccm().addressOfNode(2)).size()) + assertThat( + channelMonitor + .openChannelsPortAgnostic( + ccm().addressOfNode(1).getAddress(), ccm().addressOfNode(2).getAddress()) + .size()) .isEqualTo(0); } private void assertOpenConnections(int expected, Cluster cluster) { assertThat(cluster.getMetrics().getOpenConnections().getValue()).isEqualTo(expected); - assertThat(channelMonitor.openChannels(ccm().addressOfNode(1), ccm().addressOfNode(2)).size()) + assertThat( + channelMonitor + .openChannelsPortAgnostic( + ccm().addressOfNode(1).getAddress(), ccm().addressOfNode(2).getAddress()) + .size()) .isEqualTo(expected); } } diff --git a/driver-core/src/test/java/com/datastax/driver/core/utils/SocketChannelMonitor.java b/driver-core/src/test/java/com/datastax/driver/core/utils/SocketChannelMonitor.java index 07977f58d6b..f777dd43e89 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/utils/SocketChannelMonitor.java +++ b/driver-core/src/test/java/com/datastax/driver/core/utils/SocketChannelMonitor.java @@ -26,6 +26,7 @@ import io.netty.channel.socket.SocketChannel; import java.io.Closeable; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; @@ -192,6 +193,31 @@ public boolean apply(SocketChannel input) { return channels; } + public Collection openChannelsPortAgnostic(InetAddress... addresses) { + return openChannelsPortAgnostic(Arrays.asList(addresses)); + } + + /** + * @param addresses The InetAddresses to include. The port is ignored in this case. + * @return Open channels matching the given InetAddresses. + */ + public Collection openChannelsPortAgnostic( + final Collection addresses) { + List channels = + Lists.newArrayList( + matchingChannels( + new Predicate() { + @Override + public boolean apply(SocketChannel input) { + return input.isOpen() + && input.remoteAddress() != null + && addresses.contains(input.remoteAddress().getAddress()); + } + })); + Collections.sort(channels, BY_REMOTE_ADDRESS); + return channels; + } + /** * @param channelFilter {@link Predicate} to use to determine whether or not a socket shall be * considered. From 2df48bc76be38a8cf84466b2d0c7308867970ebb Mon Sep 17 00:00:00 2001 From: Bouncheck <36934780+Bouncheck@users.noreply.github.com> Date: Thu, 26 Jun 2025 23:08:07 +0200 Subject: [PATCH 2/2] Increase numberOfNodes in SchemaChangesCCTest When running this test with newer Scylla, the drop keyspace query times out. This is because the cluster lacks raft quorum, although the returned `OperationTimedOutException` does not inform about that. To observe this you need to enable debug logging and then you may find message like this one before the exception: ``` [cluster2-nio-worker-3] DEBUG com.datastax.driver.core.Connection - Connection[/127.0.1.2:40269-1, inFlight=0, closed=false] Response received on stream 832 but no handler set anymore (either the request has timed out or it was closed due to another error). Received message is ERROR SERVER_ERROR: group [e4fdf001-521a-11f0-b1fb-7d275bcabf53] raft operation [read_barrier] timed out, there is no raft quorum, total voters count 2, alive voters count 1, dead voters [127.0.1.1] ``` Increasing the number of nodes to 3 allows the cluster to maintain the raft quorum and does not impact the test itself. --- .../test/java/com/datastax/driver/core/SchemaChangesCCTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/test/java/com/datastax/driver/core/SchemaChangesCCTest.java b/driver-core/src/test/java/com/datastax/driver/core/SchemaChangesCCTest.java index 0538cf87e07..d57301bb7ee 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/SchemaChangesCCTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/SchemaChangesCCTest.java @@ -33,7 +33,7 @@ import org.mockito.ArgumentCaptor; import org.testng.annotations.Test; -@CCMConfig(numberOfNodes = 2, dirtiesContext = true, createCluster = false) +@CCMConfig(numberOfNodes = 3, dirtiesContext = true, createCluster = false) public class SchemaChangesCCTest extends CCMTestsSupport { private static final int NOTIF_TIMEOUT_MS = 5000;