File tree Expand file tree Collapse file tree 2 files changed +42
-3
lines changed
driver-core/src/test/java/com/datastax/driver/core Expand file tree Collapse file tree 2 files changed +42
-3
lines changed Original file line number Diff line number Diff line change @@ -50,7 +50,12 @@ public void connectionLeakTest() throws Exception {
50
50
assertOpenConnections (1 , cluster );
51
51
52
52
// ensure sessions.size() returns with 1 control connection + core pool size.
53
- int corePoolSize = TestUtils .numberOfLocalCoreConnections (cluster );
53
+ int corePoolSize ;
54
+ if (ccm ().getScyllaVersion () != null ) {
55
+ corePoolSize = TestUtils .numberOfLocalCoreConnectionsSharded (cluster );
56
+ } else {
57
+ corePoolSize = TestUtils .numberOfLocalCoreConnections (cluster );
58
+ }
54
59
Session session = cluster .connect ();
55
60
56
61
assertThat (cluster .manager .sessions .size ()).isEqualTo (1 );
@@ -114,13 +119,21 @@ public void should_not_leak_session_when_wrong_keyspace() throws Exception {
114
119
// Ensure no channels remain open.
115
120
channelMonitor .stop ();
116
121
channelMonitor .report ();
117
- assertThat (channelMonitor .openChannels (ccm ().addressOfNode (1 ), ccm ().addressOfNode (2 )).size ())
122
+ assertThat (
123
+ channelMonitor
124
+ .openChannelsPortAgnostic (
125
+ ccm ().addressOfNode (1 ).getAddress (), ccm ().addressOfNode (2 ).getAddress ())
126
+ .size ())
118
127
.isEqualTo (0 );
119
128
}
120
129
121
130
private void assertOpenConnections (int expected , Cluster cluster ) {
122
131
assertThat (cluster .getMetrics ().getOpenConnections ().getValue ()).isEqualTo (expected );
123
- assertThat (channelMonitor .openChannels (ccm ().addressOfNode (1 ), ccm ().addressOfNode (2 )).size ())
132
+ assertThat (
133
+ channelMonitor
134
+ .openChannelsPortAgnostic (
135
+ ccm ().addressOfNode (1 ).getAddress (), ccm ().addressOfNode (2 ).getAddress ())
136
+ .size ())
124
137
.isEqualTo (expected );
125
138
}
126
139
}
Original file line number Diff line number Diff line change 26
26
import io .netty .channel .socket .SocketChannel ;
27
27
import java .io .Closeable ;
28
28
import java .io .IOException ;
29
+ import java .net .InetAddress ;
29
30
import java .net .InetSocketAddress ;
30
31
import java .util .Arrays ;
31
32
import java .util .Collection ;
@@ -192,6 +193,31 @@ public boolean apply(SocketChannel input) {
192
193
return channels ;
193
194
}
194
195
196
+ public Collection <SocketChannel > openChannelsPortAgnostic (InetAddress ... addresses ) {
197
+ return openChannelsPortAgnostic (Arrays .asList (addresses ));
198
+ }
199
+
200
+ /**
201
+ * @param addresses The InetAddresses to include. The port is ignored in this case.
202
+ * @return Open channels matching the given InetAddresses.
203
+ */
204
+ public Collection <SocketChannel > openChannelsPortAgnostic (
205
+ final Collection <InetAddress > addresses ) {
206
+ List <SocketChannel > channels =
207
+ Lists .newArrayList (
208
+ matchingChannels (
209
+ new Predicate <SocketChannel >() {
210
+ @ Override
211
+ public boolean apply (SocketChannel input ) {
212
+ return input .isOpen ()
213
+ && input .remoteAddress () != null
214
+ && addresses .contains (input .remoteAddress ().getAddress ());
215
+ }
216
+ }));
217
+ Collections .sort (channels , BY_REMOTE_ADDRESS );
218
+ return channels ;
219
+ }
220
+
195
221
/**
196
222
* @param channelFilter {@link Predicate} to use to determine whether or not a socket shall be
197
223
* considered.
You can’t perform that action at this time.
0 commit comments