Skip to content

Commit 6c48329

Browse files
akhakuabsurdfarce
authored andcommitted
CASSANDRA-19468 Don't swallow exception during metadata refresh
If an exception was thrown while getting new metadata as part of schema refresh it died on the admin executor instead of being propagated to the CompletableFuture argument. Instead, catch those exceptions and hand them off to the CompletableFuture. patch by Ammar Khaku; reviewed by Chris Lohfink, Bret McGuire for CASSANDRA-19468
1 parent 9c41aab commit 6c48329

File tree

2 files changed

+52
-24
lines changed

2 files changed

+52
-24
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -437,30 +437,35 @@ private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> refreshFu
437437
if (agreementError != null) {
438438
refreshFuture.completeExceptionally(agreementError);
439439
} else {
440-
schemaQueriesFactory
441-
.newInstance()
442-
.execute()
443-
.thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor)
444-
.whenComplete(
445-
(newMetadata, metadataError) -> {
446-
if (metadataError != null) {
447-
refreshFuture.completeExceptionally(metadataError);
448-
} else {
449-
refreshFuture.complete(
450-
new RefreshSchemaResult(newMetadata, schemaInAgreement));
451-
}
452-
453-
firstSchemaRefreshFuture.complete(null);
454-
455-
currentSchemaRefresh = null;
456-
// If another refresh was enqueued during this one, run it now
457-
if (queuedSchemaRefresh != null) {
458-
CompletableFuture<RefreshSchemaResult> tmp =
459-
this.queuedSchemaRefresh;
460-
this.queuedSchemaRefresh = null;
461-
startSchemaRequest(tmp);
462-
}
463-
});
440+
try {
441+
schemaQueriesFactory
442+
.newInstance()
443+
.execute()
444+
.thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor)
445+
.whenComplete(
446+
(newMetadata, metadataError) -> {
447+
if (metadataError != null) {
448+
refreshFuture.completeExceptionally(metadataError);
449+
} else {
450+
refreshFuture.complete(
451+
new RefreshSchemaResult(newMetadata, schemaInAgreement));
452+
}
453+
454+
firstSchemaRefreshFuture.complete(null);
455+
456+
currentSchemaRefresh = null;
457+
// If another refresh was enqueued during this one, run it now
458+
if (queuedSchemaRefresh != null) {
459+
CompletableFuture<RefreshSchemaResult> tmp =
460+
this.queuedSchemaRefresh;
461+
this.queuedSchemaRefresh = null;
462+
startSchemaRequest(tmp);
463+
}
464+
});
465+
} catch (Throwable t) {
466+
LOG.debug("[{}] Exception getting new metadata", logPrefix, t);
467+
refreshFuture.completeExceptionally(t);
468+
}
464469
}
465470
});
466471
} else if (queuedSchemaRefresh == null) {

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.datastax.oss.driver.Assertions.assertThat;
2121
import static com.datastax.oss.driver.Assertions.assertThatStage;
2222
import static org.awaitility.Awaitility.await;
23+
import static org.mockito.ArgumentMatchers.anyBoolean;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.timeout;
2526
import static org.mockito.Mockito.verify;
@@ -33,6 +34,7 @@
3334
import com.datastax.oss.driver.internal.core.context.EventBus;
3435
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
3536
import com.datastax.oss.driver.internal.core.context.NettyOptions;
37+
import com.datastax.oss.driver.internal.core.control.ControlConnection;
3638
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory;
3739
import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory;
3840
import com.datastax.oss.driver.internal.core.metrics.MetricsFactory;
@@ -64,6 +66,7 @@ public class MetadataManagerTest {
6466

6567
@Mock private InternalDriverContext context;
6668
@Mock private NettyOptions nettyOptions;
69+
@Mock private ControlConnection controlConnection;
6770
@Mock private TopologyMonitor topologyMonitor;
6871
@Mock private DriverConfig config;
6972
@Mock private DriverExecutionProfile defaultProfile;
@@ -85,6 +88,7 @@ public void setup() {
8588
when(context.getNettyOptions()).thenReturn(nettyOptions);
8689

8790
when(context.getTopologyMonitor()).thenReturn(topologyMonitor);
91+
when(context.getControlConnection()).thenReturn(controlConnection);
8892

8993
when(defaultProfile.getDuration(DefaultDriverOption.METADATA_SCHEMA_WINDOW))
9094
.thenReturn(Duration.ZERO);
@@ -286,6 +290,25 @@ public void should_remove_node() {
286290
assertThat(refresh.broadcastRpcAddressToRemove).isEqualTo(broadcastRpcAddress2);
287291
}
288292

293+
@Test
294+
public void refreshSchema_should_work() {
295+
// Given
296+
IllegalStateException expectedException = new IllegalStateException("Error we're testing");
297+
when(schemaQueriesFactory.newInstance()).thenThrow(expectedException);
298+
when(topologyMonitor.refreshNodeList()).thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class))));
299+
when(topologyMonitor.checkSchemaAgreement()).thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
300+
when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean())).thenReturn(CompletableFuture.completedFuture(null));
301+
metadataManager.refreshNodes(); // required internal state setup for this
302+
waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1); // sanity check
303+
304+
// When
305+
CompletionStage<MetadataManager.RefreshSchemaResult> result = metadataManager.refreshSchema("foo", true, true);
306+
307+
// Then
308+
waitForPendingAdminTasks(() -> result.toCompletableFuture().isDone());
309+
assertThatStage(result).isFailed(t -> assertThat(t).isEqualTo(expectedException));
310+
}
311+
289312
private static class TestMetadataManager extends MetadataManager {
290313

291314
private List<MetadataRefresh> refreshes = new CopyOnWriteArrayList<>();

0 commit comments

Comments
 (0)