Skip to content

Commit d01b81c

Browse files
committed
Add MockResolverIT#cannot_reconnect_with_resolved_socket()
Adds an useful method for testing the issues that surface after cluster replacements. Due to the variable, sometimes long runtime it is not added to any of the test groups.
1 parent 3446aae commit d01b81c

File tree

1 file changed

+199
-0
lines changed

1 file changed

+199
-0
lines changed

integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
package com.datastax.oss.driver.core.resolver;
2525

2626
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.junit.Assert.assertFalse;
2728
import static org.junit.Assert.assertTrue;
2829
import static org.junit.Assert.fail;
2930

@@ -33,6 +34,8 @@
3334
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
3435
import com.datastax.oss.driver.api.core.cql.ResultSet;
3536
import com.datastax.oss.driver.api.core.cql.Row;
37+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
38+
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
3639
import com.datastax.oss.driver.api.core.metadata.Node;
3740
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
3841
import com.datastax.oss.driver.categories.IsolatedTests;
@@ -43,6 +46,7 @@
4346
import java.net.InetAddress;
4447
import java.net.InetSocketAddress;
4548
import java.net.UnknownHostException;
49+
import java.time.Duration;
4650
import java.util.Collection;
4751
import java.util.Collections;
4852
import java.util.Iterator;
@@ -251,6 +255,201 @@ public void run_replace_test_20_times() {
251255
}
252256
}
253257

258+
// This is too long to run during CI, but is useful for manual investigations.
259+
@SuppressWarnings("unused")
260+
public void cannot_reconnect_with_resolved_socket() {
261+
DriverConfigLoader loader =
262+
new DefaultProgrammaticDriverConfigLoaderBuilder()
263+
.withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false)
264+
.withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
265+
.withStringList(
266+
TypedDriverOption.CONTACT_POINTS.getRawOption(),
267+
Collections.singletonList("test.cluster.fake:9042"))
268+
.build();
269+
270+
CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
271+
CqlSession session;
272+
Collection<Node> nodes;
273+
Set<Node> filteredNodes;
274+
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) {
275+
RESOLVER_FACTORY.updateResponse(
276+
"test.cluster.fake",
277+
new ValidResponse(
278+
new InetAddress[] {
279+
getNodeInetAddress(ccmBridge, 1),
280+
getNodeInetAddress(ccmBridge, 2),
281+
getNodeInetAddress(ccmBridge, 3)
282+
}));
283+
ccmBridge.create();
284+
ccmBridge.start();
285+
session = builder.build();
286+
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
287+
while (System.currentTimeMillis() < endTime) {
288+
try {
289+
nodes = session.getMetadata().getNodes().values();
290+
int upNodes = 0;
291+
for (Node node : nodes) {
292+
if (node.getUpSinceMillis() > 0) {
293+
upNodes++;
294+
}
295+
}
296+
if (upNodes == 3) {
297+
break;
298+
}
299+
// session.refreshSchema();
300+
SimpleStatement statement =
301+
new SimpleStatementBuilder("SELECT * FROM system.local")
302+
.setTimeout(Duration.ofSeconds(3))
303+
.build();
304+
session.executeAsync(statement);
305+
Thread.sleep(3000);
306+
} catch (InterruptedException e) {
307+
break;
308+
}
309+
}
310+
ResultSet rs = session.execute("SELECT * FROM system.local");
311+
assertThat(rs).isNotNull();
312+
Row row = rs.one();
313+
assertThat(row).isNotNull();
314+
nodes = session.getMetadata().getNodes().values();
315+
assertThat(nodes).hasSize(3);
316+
Iterator<Node> iterator = nodes.iterator();
317+
while (iterator.hasNext()) {
318+
LOG.trace("Metadata node: " + iterator.next().toString());
319+
}
320+
filteredNodes =
321+
nodes.stream()
322+
.filter(x -> x.toString().contains("test.cluster.fake"))
323+
.collect(Collectors.toSet());
324+
assertThat(filteredNodes).hasSize(1);
325+
}
326+
int counter = 0;
327+
while (filteredNodes.size() == 1) {
328+
counter++;
329+
if (counter == 255) {
330+
LOG.error("Completed 254 runs. Breaking.");
331+
break;
332+
}
333+
LOG.warn(
334+
"Launching another cluster until we lose resolved socket from metadata (run {}).",
335+
counter);
336+
try (CcmBridge ccmBridge =
337+
CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) {
338+
RESOLVER_FACTORY.updateResponse(
339+
"test.cluster.fake",
340+
new ValidResponse(
341+
new InetAddress[] {
342+
getNodeInetAddress(ccmBridge, 1),
343+
getNodeInetAddress(ccmBridge, 2),
344+
getNodeInetAddress(ccmBridge, 3)
345+
}));
346+
ccmBridge.create();
347+
ccmBridge.start();
348+
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
349+
while (System.currentTimeMillis() < endTime) {
350+
try {
351+
nodes = session.getMetadata().getNodes().values();
352+
int upNodes = 0;
353+
for (Node node : nodes) {
354+
if (node.getUpSinceMillis() > 0) {
355+
upNodes++;
356+
}
357+
}
358+
if (upNodes == 3) {
359+
break;
360+
}
361+
// session.refreshSchema();
362+
SimpleStatement statement =
363+
new SimpleStatementBuilder("SELECT * FROM system.local")
364+
.setTimeout(Duration.ofSeconds(3))
365+
.build();
366+
session.executeAsync(statement);
367+
Thread.sleep(3000);
368+
} catch (InterruptedException e) {
369+
break;
370+
}
371+
}
372+
/*
373+
ResultSet rs = session.execute("SELECT * FROM system.local");
374+
assertThat(rs).isNotNull();
375+
Row row = rs.one();
376+
assertThat(row).isNotNull();
377+
*/
378+
nodes = session.getMetadata().getNodes().values();
379+
assertThat(nodes).hasSize(3);
380+
Iterator<Node> iterator = nodes.iterator();
381+
while (iterator.hasNext()) {
382+
LOG.trace("Metadata node: " + iterator.next().toString());
383+
}
384+
filteredNodes =
385+
nodes.stream()
386+
.filter(x -> x.toString().contains("test.cluster.fake"))
387+
.collect(Collectors.toSet());
388+
if (filteredNodes.size() > 1) {
389+
fail(
390+
"Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen.");
391+
}
392+
}
393+
}
394+
Iterator<Node> iterator = nodes.iterator();
395+
while (iterator.hasNext()) {
396+
InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve();
397+
assertFalse(address.isUnresolved());
398+
}
399+
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.1.1.").build()) {
400+
RESOLVER_FACTORY.updateResponse(
401+
"test.cluster.fake",
402+
new ValidResponse(
403+
new InetAddress[] {
404+
getNodeInetAddress(ccmBridge, 1),
405+
getNodeInetAddress(ccmBridge, 2),
406+
getNodeInetAddress(ccmBridge, 3)
407+
}));
408+
// Now the driver should fail to reconnect since unresolved hostname is gone.
409+
ccmBridge.create();
410+
ccmBridge.start();
411+
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
412+
while (System.currentTimeMillis() < endTime) {
413+
try {
414+
nodes = session.getMetadata().getNodes().values();
415+
int upNodes = 0;
416+
for (Node node : nodes) {
417+
if (node.getUpSinceMillis() > 0) {
418+
upNodes++;
419+
}
420+
}
421+
if (upNodes == 3) {
422+
break;
423+
}
424+
// session.refreshSchema();
425+
SimpleStatement statement =
426+
new SimpleStatementBuilder("SELECT * FROM system.local")
427+
.setTimeout(Duration.ofSeconds(3))
428+
.build();
429+
session.executeAsync(statement);
430+
Thread.sleep(3000);
431+
} catch (InterruptedException e) {
432+
break;
433+
}
434+
}
435+
/*
436+
for (int i = 0; i < 15; i++) {
437+
try {
438+
nodes = session.getMetadata().getNodes().values();
439+
if (nodes.size() == 3) {
440+
break;
441+
}
442+
Thread.sleep(1000);
443+
} catch (InterruptedException e) {
444+
break;
445+
}
446+
}
447+
*/
448+
session.execute("SELECT * FROM system.local");
449+
}
450+
session.close();
451+
}
452+
254453
private static InetAddress getNodeInetAddress(CcmBridge ccmBridge, int nodeid) {
255454
try {
256455
return InetAddress.getByName(ccmBridge.getNodeIpAddress(nodeid));

0 commit comments

Comments
 (0)