|
16 | 16 |
|
17 | 17 | package com.couchbase.client.java; |
18 | 18 |
|
| 19 | +import com.couchbase.client.core.cnc.Event; |
19 | 20 | import com.couchbase.client.core.cnc.SimpleEventBus; |
20 | 21 | import com.couchbase.client.core.cnc.events.endpoint.EndpointDisconnectDelayedEvent; |
21 | 22 | import com.couchbase.client.core.cnc.events.endpoint.EndpointDisconnectResumedEvent; |
22 | | -import com.couchbase.client.core.cnc.events.endpoint.EndpointStateChangedEvent; |
23 | 23 | import com.couchbase.client.core.config.ProposedGlobalConfigContext; |
24 | | -import com.couchbase.client.core.service.ServiceType; |
25 | | -import com.couchbase.client.java.env.ClusterEnvironment; |
26 | | -import com.couchbase.client.java.query.QueryScanConsistency; |
| 24 | +import com.couchbase.client.core.error.TimeoutException; |
| 25 | +import com.couchbase.client.java.json.JsonArray; |
27 | 26 | import com.couchbase.client.java.util.JavaIntegrationTest; |
28 | 27 | import com.couchbase.client.test.Capabilities; |
29 | 28 | import com.couchbase.client.test.IgnoreWhen; |
30 | | -import org.junit.jupiter.api.AfterAll; |
31 | | -import org.junit.jupiter.api.AfterEach; |
32 | | -import org.junit.jupiter.api.BeforeAll; |
33 | | -import org.junit.jupiter.api.BeforeEach; |
| 29 | +import org.jspecify.annotations.Nullable; |
34 | 30 | import org.junit.jupiter.api.Test; |
| 31 | +import org.opentest4j.AssertionFailedError; |
| 32 | +import org.slf4j.Logger; |
| 33 | +import org.slf4j.LoggerFactory; |
35 | 34 |
|
36 | 35 | import java.time.Duration; |
37 | | -import java.util.Collections; |
38 | | -import java.util.LinkedList; |
39 | 36 | import java.util.List; |
40 | | -import java.util.concurrent.ExecutionException; |
41 | | -import java.util.concurrent.atomic.AtomicBoolean; |
| 37 | +import java.util.UUID; |
| 38 | +import java.util.concurrent.CompletableFuture; |
42 | 39 |
|
| 40 | +import static com.couchbase.client.core.util.CbCollections.listOf; |
| 41 | +import static com.couchbase.client.java.QueryIntegrationTest.verySlowQueryStatement; |
43 | 42 | import static com.couchbase.client.java.manager.query.QueryIndexManagerIntegrationTest.DISABLE_QUERY_TESTS_FOR_CLUSTER; |
44 | 43 | import static com.couchbase.client.java.query.QueryOptions.queryOptions; |
| 44 | +import static com.couchbase.client.test.Util.waitUntilCondition; |
| 45 | +import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| 46 | +import static java.util.stream.Collectors.toList; |
45 | 47 | import static org.junit.jupiter.api.Assertions.assertEquals; |
| 48 | +import static org.junit.jupiter.api.Assertions.assertInstanceOf; |
46 | 49 |
|
47 | | -/** |
48 | | - * Verifies the delayed disconnect of an in-progress query Must be in package com.couchbase.client.core to override |
49 | | - * Core.createConfigurationProvider() |
50 | | - * <p> |
51 | | - * Disabling against 5.5. See comment on QueryIndexManagerIntegrationTest for details. |
52 | | - */ |
53 | 50 | @IgnoreWhen(missesCapabilities = {Capabilities.QUERY, Capabilities.CLUSTER_LEVEL_QUERY}, |
54 | 51 | clusterVersionEquals = DISABLE_QUERY_TESTS_FOR_CLUSTER) |
55 | 52 | class QueryDelayDisconnectIntegrationTest extends JavaIntegrationTest { |
| 53 | + private static final Logger log = LoggerFactory.getLogger(QueryDelayDisconnectIntegrationTest.class); |
56 | 54 |
|
57 | | - private static ClusterEnvironment initEnv; |
58 | | - private static Cluster initCluster; |
59 | | - |
60 | | - private static ClusterEnvironment environment; |
61 | | - private static Cluster cluster; |
62 | | - |
63 | | - private static SimpleEventBus eventBus = new SimpleEventBus(true, Collections.singletonList(EndpointStateChangedEvent.class)); |
64 | | - |
65 | | - @BeforeAll |
66 | | - static void setup() throws ExecutionException, InterruptedException { |
67 | | - |
68 | | - // create a cluster for initialization. |
69 | | - // do not use the same cluster for testing as the initialization cluster is polluted by accessing the bucket. |
70 | | - |
71 | | - initEnv = ClusterEnvironment.builder().build(); |
72 | | - ClusterOptions initOpts = ClusterOptions.clusterOptions(authenticator()).environment(initEnv); |
73 | | - initCluster = Cluster.connect(connectionString(), initOpts);//createCluster(env -> ClusterEnvironment.builder().eventBus(eventBus).build()); |
74 | | - Bucket bucket = initCluster.bucket(config().bucketname()); |
75 | | - |
76 | | - bucket.waitUntilReady(WAIT_UNTIL_READY_DEFAULT); |
77 | | - waitForService(bucket, ServiceType.QUERY); |
78 | | - waitForQueryIndexerToHaveKeyspace(initCluster, config().bucketname()); |
79 | | - |
80 | | - createPrimaryIndex(initCluster, config().bucketname()); |
81 | | - |
82 | | - for (int i = 0; i < 100; i++) { |
83 | | - initCluster.bucket(config().bucketname()).defaultCollection().upsert("" + i, "{}"); |
| 55 | + @Test |
| 56 | + void inflightQueryIsAllowedToCompleteIfNodeLeavesCluster() throws Throwable { |
| 57 | + // This test relies on a query timeout expiring. Minimize typical execution time by |
| 58 | + // starting with short timeout and trying again with a longer timeout if necessary. |
| 59 | + try { |
| 60 | + testWithQueryTimeout(Duration.ofSeconds(4)); // long enough for laptop or GHA |
| 61 | + } catch (Throwable t) { |
| 62 | + log.warn("Test failed with short query timeout. Trying again with longer timeout.", t); |
| 63 | + testWithQueryTimeout(Duration.ofSeconds(40)); // long enough for a glacial CI environment |
84 | 64 | } |
85 | | - |
86 | | - // create a cluster for testing |
87 | | - |
88 | | - environment = ClusterEnvironment.builder().eventBus(eventBus).ioConfig(io -> io.configPollInterval(Duration.ofHours(24))).build(); |
89 | | - ClusterOptions opts = ClusterOptions.clusterOptions(authenticator()).environment(environment); |
90 | | - cluster = Cluster.connect(connectionString(), opts); |
91 | | - |
92 | 65 | } |
93 | 66 |
|
94 | | - @BeforeEach |
95 | | - void beforeEach() { |
| 67 | + static void testWithQueryTimeout(Duration queryTimeout) throws Exception { |
| 68 | + Duration gracePeriod = queryTimeout.dividedBy(3); |
| 69 | + log.info("Testing with queryTimeout={} gracePeriod={}", queryTimeout, gracePeriod); |
| 70 | + |
| 71 | + SimpleEventBus eventBus = new SimpleEventBus(true); |
| 72 | + |
| 73 | + try (Cluster cluster = createCluster(env -> env |
| 74 | + .eventBus(eventBus) |
| 75 | + .ioConfig(io -> io.configPollInterval(Duration.ofHours(24))) |
| 76 | + )) { |
| 77 | + String clientContextId = UUID.randomUUID().toString(); // so we can monitor query state |
| 78 | + CompletableFuture<Throwable> queryErrorFuture = new CompletableFuture<>(); |
| 79 | + |
| 80 | + log.info("Scheduling a query we expect to time out."); |
| 81 | + cluster.reactive() |
| 82 | + .query( |
| 83 | + verySlowQueryStatement(), |
| 84 | + queryOptions() |
| 85 | + .clientContextId(clientContextId) |
| 86 | + .timeout(queryTimeout) |
| 87 | + ) |
| 88 | + .subscribe( |
| 89 | + result -> queryErrorFuture.complete(new AssertionFailedError("Expected query to time out.")), |
| 90 | + queryErrorFuture::complete |
| 91 | + ); |
| 92 | + |
| 93 | + log.info("Waiting for query execution to start."); |
| 94 | + waitUntilCondition( |
| 95 | + () -> assertEquals("running", getQueryState(cluster, clientContextId), "query state"), |
| 96 | + gracePeriod |
| 97 | + ); |
| 98 | + |
| 99 | + log.info("Tricking the SDK into thinking all query nodes went away."); |
| 100 | + cluster.core().configurationProvider().proposeGlobalConfig( |
| 101 | + new ProposedGlobalConfigContext(dummyConfigWithNoQueryNodes, "127.0.0.1", true) |
| 102 | + ); |
| 103 | + |
| 104 | + log.info("Verifying network channel closure was deferred."); |
| 105 | + waitUntilEvents(eventBus, gracePeriod, listOf( |
| 106 | + EndpointDisconnectDelayedEvent.class |
| 107 | + )); |
| 108 | + |
| 109 | + log.info("Waiting for query timeout."); |
| 110 | + Duration pollTimeout = queryTimeout.plus(gracePeriod); |
| 111 | + Throwable t = queryErrorFuture.get(pollTimeout.toMillis(), MILLISECONDS); |
| 112 | + assertInstanceOf(TimeoutException.class, t); |
| 113 | + |
| 114 | + log.info("Verifying network channel was closed."); |
| 115 | + waitUntilEvents(eventBus, gracePeriod, listOf( |
| 116 | + EndpointDisconnectDelayedEvent.class, |
| 117 | + EndpointDisconnectResumedEvent.class |
| 118 | + )); |
| 119 | + } |
96 | 120 | } |
97 | 121 |
|
98 | | - @AfterEach |
99 | | - void afterEach() { |
100 | | - eventBus.clear(); |
| 122 | + private static void waitUntilEvents(SimpleEventBus eventBus, Duration gracePeriod, List<Class<?>> expectedEventClasses) { |
| 123 | + waitUntilCondition( |
| 124 | + () -> assertEquals(expectedEventClasses, getEventClasses(eventBus)), |
| 125 | + gracePeriod |
| 126 | + ); |
101 | 127 | } |
102 | 128 |
|
103 | | - @AfterAll |
104 | | - static void tearDown() { |
105 | | - cluster.disconnect(); |
106 | | - environment.shutdown(); |
107 | | - |
108 | | - for (int i = 0; i < 100; i++) { |
109 | | - initCluster.bucket(config().bucketname()).defaultCollection().remove("" + i); |
110 | | - } |
111 | | - initCluster.disconnect(); |
112 | | - initEnv.shutdown(); |
| 129 | + private static List<Class<?>> getEventClasses(SimpleEventBus eventBus) { |
| 130 | + return eventBus.publishedEvents().stream() |
| 131 | + .filter(e -> e instanceof EndpointDisconnectDelayedEvent || e instanceof EndpointDisconnectResumedEvent) |
| 132 | + .map(Event::getClass) |
| 133 | + .collect(toList()); |
113 | 134 | } |
114 | 135 |
|
115 | | - @Test |
116 | | - void simpleQueryClose() throws InterruptedException, ExecutionException { |
117 | | - |
118 | | - // Start a query. |
119 | | - // When the first row is retrieved, modify the configuration by removing the n1ql nodes |
120 | | - // The query endpoint will not be closed until the query completes |
121 | | - |
122 | | - AtomicBoolean first = new AtomicBoolean(true); |
123 | | - cluster.reactive().query( |
124 | | - "select * from `" + config().bucketname() + "` a UNNEST(SELECT b.* FROM `" + config().bucketname() |
125 | | - + "` b limit 100) AS c limit 10000", |
126 | | - queryOptions().metrics(true).scanConsistency(QueryScanConsistency.REQUEST_PLUS) |
127 | | - ).block() |
128 | | - .rowsAs(byte[].class).doOnNext(it -> { |
129 | | - if (first.compareAndSet(true, false)) { |
130 | | - cluster.reactive().core().configurationProvider().proposeGlobalConfig( |
131 | | - new ProposedGlobalConfigContext(dummyConfig, "localhost", true) |
132 | | - ); |
133 | | - } |
134 | | - }).blockLast(); |
135 | | - cluster.reactive().core().shutdown().block(); // flush out events |
136 | | - List<Class> events = new LinkedList<>(); |
137 | | - eventBus.publishedEvents().stream().filter(e -> e instanceof EndpointDisconnectDelayedEvent || e instanceof EndpointDisconnectResumedEvent).forEach(e -> events.add(e.getClass())); |
138 | | - assertEquals(2, events.size()); |
139 | | - assertEquals(events.get(0), EndpointDisconnectDelayedEvent.class); |
140 | | - assertEquals(events.get(1), EndpointDisconnectResumedEvent.class); |
141 | | - |
| 136 | + private static @Nullable String getQueryState(Cluster cluster, String clientContextId) { |
| 137 | + return cluster.query( |
| 138 | + "SELECT RAW state FROM system:active_requests WHERE clientContextID = ?", |
| 139 | + queryOptions() |
| 140 | + .parameters(JsonArray.from(clientContextId)) |
| 141 | + ) |
| 142 | + .rowsAs(String.class) |
| 143 | + .stream().findFirst().orElse(null); |
142 | 144 | } |
143 | 145 |
|
144 | | - String dummyConfig = "{\n" + |
| 146 | + private static final String dummyConfigWithNoQueryNodes = "{\n" + |
145 | 147 | " \"revEpoch\": 9999999999,\n" + |
146 | 148 | " \"rev\": 13205,\n" + |
147 | 149 | " \"nodesExt\": [\n" + |
|
0 commit comments