Skip to content

Commit 6f9f7d5

Browse files
committed
Merge commit '6c48329199862215abc22170769fd1a165e80a15' of https://github.com/apache/cassandra-java-driver into pull-upstream-4.18.1-v3
Applied auto-formatter
2 parents 724a4e0 + 6c48329 commit 6f9f7d5

File tree

2 files changed

+56
-24
lines changed

2 files changed

+56
-24
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -447,30 +447,35 @@ private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> refreshFu
447447
if (agreementError != null) {
448448
refreshFuture.completeExceptionally(agreementError);
449449
} else {
450-
schemaQueriesFactory
451-
.newInstance()
452-
.execute()
453-
.thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor)
454-
.whenComplete(
455-
(newMetadata, metadataError) -> {
456-
if (metadataError != null) {
457-
refreshFuture.completeExceptionally(metadataError);
458-
} else {
459-
refreshFuture.complete(
460-
new RefreshSchemaResult(newMetadata, schemaInAgreement));
461-
}
462-
463-
firstSchemaRefreshFuture.complete(null);
464-
465-
currentSchemaRefresh = null;
466-
// If another refresh was enqueued during this one, run it now
467-
if (queuedSchemaRefresh != null) {
468-
CompletableFuture<RefreshSchemaResult> tmp =
469-
this.queuedSchemaRefresh;
470-
this.queuedSchemaRefresh = null;
471-
startSchemaRequest(tmp);
472-
}
473-
});
450+
try {
451+
schemaQueriesFactory
452+
.newInstance()
453+
.execute()
454+
.thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor)
455+
.whenComplete(
456+
(newMetadata, metadataError) -> {
457+
if (metadataError != null) {
458+
refreshFuture.completeExceptionally(metadataError);
459+
} else {
460+
refreshFuture.complete(
461+
new RefreshSchemaResult(newMetadata, schemaInAgreement));
462+
}
463+
464+
firstSchemaRefreshFuture.complete(null);
465+
466+
currentSchemaRefresh = null;
467+
// If another refresh was enqueued during this one, run it now
468+
if (queuedSchemaRefresh != null) {
469+
CompletableFuture<RefreshSchemaResult> tmp =
470+
this.queuedSchemaRefresh;
471+
this.queuedSchemaRefresh = null;
472+
startSchemaRequest(tmp);
473+
}
474+
});
475+
} catch (Throwable t) {
476+
LOG.debug("[{}] Exception getting new metadata", logPrefix, t);
477+
refreshFuture.completeExceptionally(t);
478+
}
474479
}
475480
});
476481
} else if (queuedSchemaRefresh == null) {

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.datastax.oss.driver.Assertions.assertThat;
2121
import static com.datastax.oss.driver.Assertions.assertThatStage;
2222
import static org.awaitility.Awaitility.await;
23+
import static org.mockito.ArgumentMatchers.anyBoolean;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.timeout;
2526
import static org.mockito.Mockito.verify;
@@ -33,6 +34,7 @@
3334
import com.datastax.oss.driver.internal.core.context.EventBus;
3435
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
3536
import com.datastax.oss.driver.internal.core.context.NettyOptions;
37+
import com.datastax.oss.driver.internal.core.control.ControlConnection;
3638
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory;
3739
import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory;
3840
import com.datastax.oss.driver.internal.core.metrics.MetricsFactory;
@@ -64,6 +66,7 @@ public class MetadataManagerTest {
6466

6567
@Mock private InternalDriverContext context;
6668
@Mock private NettyOptions nettyOptions;
69+
@Mock private ControlConnection controlConnection;
6770
@Mock private TopologyMonitor topologyMonitor;
6871
@Mock private DriverConfig config;
6972
@Mock private DriverExecutionProfile defaultProfile;
@@ -85,6 +88,7 @@ public void setup() {
8588
when(context.getNettyOptions()).thenReturn(nettyOptions);
8689

8790
when(context.getTopologyMonitor()).thenReturn(topologyMonitor);
91+
when(context.getControlConnection()).thenReturn(controlConnection);
8892

8993
when(defaultProfile.getDuration(DefaultDriverOption.METADATA_SCHEMA_WINDOW))
9094
.thenReturn(Duration.ZERO);
@@ -286,6 +290,29 @@ public void should_remove_node() {
286290
assertThat(refresh.broadcastRpcAddressToRemove).isEqualTo(broadcastRpcAddress2);
287291
}
288292

293+
@Test
294+
public void refreshSchema_should_work() {
295+
// Given
296+
IllegalStateException expectedException = new IllegalStateException("Error we're testing");
297+
when(schemaQueriesFactory.newInstance()).thenThrow(expectedException);
298+
when(topologyMonitor.refreshNodeList())
299+
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class))));
300+
when(topologyMonitor.checkSchemaAgreement())
301+
.thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
302+
when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean()))
303+
.thenReturn(CompletableFuture.completedFuture(null));
304+
metadataManager.refreshNodes(); // required internal state setup for this
305+
waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1); // sanity check
306+
307+
// When
308+
CompletionStage<MetadataManager.RefreshSchemaResult> result =
309+
metadataManager.refreshSchema("foo", true, true);
310+
311+
// Then
312+
waitForPendingAdminTasks(() -> result.toCompletableFuture().isDone());
313+
assertThatStage(result).isFailed(t -> assertThat(t).isEqualTo(expectedException));
314+
}
315+
289316
private static class TestMetadataManager extends MetadataManager {
290317

291318
private List<MetadataRefresh> refreshes = new CopyOnWriteArrayList<>();

0 commit comments

Comments
 (0)