diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 2c2dfa7d83f4..c342a5852c99 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -94,7 +94,7 @@ @SuppressWarnings({ "WeakerAccess", "FieldMayBeFinal", "NonAtomicOperationOnVolatileField", "SameParameterValue" }) public class OutboundConnection { - static final Logger logger = LoggerFactory.getLogger(OutboundConnection.class); + private static final Logger logger = LoggerFactory.getLogger(OutboundConnection.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 30L, TimeUnit.SECONDS); private static final AtomicLongFieldUpdater submittedUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "submittedCount"); @@ -485,7 +485,7 @@ private boolean onExpired(Message message) */ private void onFailedSerialize(Message message, int messagingVersion, int bytesWrittenToNetwork, Throwable t) { - logger.warn("{} dropping message of type {} due to error", id(), message.verb(), t); + noSpamLogger.warn("{} dropping message of type {} due to error", id(), message.verb(), t); JVMStabilityInspector.inspectThrowable(t); releaseCapacity(1, canonicalSize(message)); errorCount += 1; @@ -949,7 +949,7 @@ public void run() try { priorThreadName = Thread.currentThread().getName(); - threadName = "Messaging-OUT-" + template.from() + "->" + template.to + '-' + type; + threadName = "Messaging-OUT-" + template.from + "->" + template.to + '-' + type; Thread.currentThread().setName(threadName); super.run(); @@ -1089,14 +1089,15 @@ class Initiate void onFailure(Throwable cause) { if (cause instanceof ConnectException) - noSpamLogger.info("{} failed to connect", id(), cause); + logger.info("{} failed to connect", id(), cause); else - noSpamLogger.error("{} failed to connect", id(), cause); + logger.error("{} failed to connect", id(), cause); JVMStabilityInspector.inspectThrowable(cause); if (hasPending()) { + logger.info("{} scheduling reconnect to {}", id(), settings.connectToId()); Promise> result = new AsyncPromise<>(eventLoop); state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis), MILLISECONDS)); retryRateMillis = min(1000, retryRateMillis * 2); @@ -1104,6 +1105,7 @@ void onFailure(Throwable cause) else { // this Initiate will be discarded + logger.info("{} no more pending messages; not retrying connection to {}", id(), settings.connectToId()); state = Disconnected.dormant(state.disconnected().maintenance); } } @@ -1143,7 +1145,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) } catch (Throwable t) { - logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); + noSpamLogger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); } } }); @@ -1191,6 +1193,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) private void attempt(Promise> result) { ++connectionAttempts; + logger.info("{} attempting to connect to {}; attempt {}", id(), template.connectToId(), connectionAttempts); /* * Re-evaluate messagingVersion before re-attempting the connection in case @@ -1611,7 +1614,7 @@ private String id() settings = state.established().settings; } String channelId = channel != null ? channel.id().asShortText() : "[no-channel]"; - return SocketFactory.channelId(settings.from(), settings.to, type, channelId); + return SocketFactory.channelId(settings.from, settings.to, type, channelId); } @Override diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java index a6fed5047a37..02f718809712 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java @@ -172,8 +172,8 @@ public boolean withEncryption() public String toString() { - return String.format("peer: (%s, %s), framing: %s, encryption: %s", - to, connectTo, framing, SocketFactory.encryptionOptionsSummary(encryption)); + return String.format("from: %s peer: (%s, %s), framing: %s, encryption: %s", + from, to, connectTo, framing, SocketFactory.encryptionOptionsSummary(encryption)); } public OutboundConnectionSettings withAuthenticator(IInternodeAuthenticator authenticator) diff --git a/src/java/org/apache/cassandra/net/UnknownEndpointException.java b/src/java/org/apache/cassandra/net/UnknownEndpointException.java new file mode 100644 index 000000000000..dc187f6d50f4 --- /dev/null +++ b/src/java/org/apache/cassandra/net/UnknownEndpointException.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +/** + * Exception thrown when the messaging service encounters an endpoint that cannot be resolved or is not known + * to the cluster. This typically occurs during network operations when attempting to communicate with a node + * that is not recognized by the current node's topology information. + * + *

This exception is thrown when the + * {@link org.apache.cassandra.locator.IEndpointSnitch#getPreferredAddress(org.apache.cassandra.locator.InetAddressAndPort)} + * method encounters an endpoint that cannot be resolved to a preferred address.

+ * + *

Common scenarios where this exception may be thrown include:

+ * + *
    + *
  • During speculative read retries when the target replica endpoint is not known to the messaging service
  • + *
  • When attempting to establish connections to endpoints that are not part of the known cluster topology
  • + *
+ * + *

The exception is used internally by the messaging system to handle cases where endpoint resolution fails, + * allowing the system to gracefully handle unknown endpoints rather than causing fatal errors. For example, + * during speculative read operations, this exception is caught and logged as a debug message, allowing the + * operation to continue without the failed speculative retry. + * + * @see org.apache.cassandra.locator.IEndpointSnitch#getPreferredAddress(org.apache.cassandra.locator.InetAddressAndPort) + * @see org.apache.cassandra.service.reads.AbstractReadExecutor#maybeTryAdditionalReplicas() + */ +public class UnknownEndpointException extends RuntimeException +{ + /** + * Constructs a new UnknownEndpointException with the specified detail message. + * + * @param message the detail message explaining why the endpoint is unknown or cannot be resolved. + * This message should provide context about which endpoint failed and the circumstances + * of the failure (e.g., "Failed to resolve preferred address for endpoint 192.168.1.100:9042") + */ + public UnknownEndpointException(String message) + { + super(message); + } +} diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index e6ec0e3a7514..b0d904ecc487 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -98,7 +98,6 @@ import org.apache.cassandra.hints.Hint; import org.apache.cassandra.hints.HintsService; import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.InetAddressAndPort; diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index cb0018975e9e..d3b987823e1a 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -41,6 +41,7 @@ import org.apache.cassandra.metrics.ReadCoordinationMetrics; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.UnknownEndpointException; import org.apache.cassandra.service.QueryInfoTracker; import org.apache.cassandra.service.StorageProxy.LocalReadRunnable; import org.apache.cassandra.service.reads.repair.ReadRepair; @@ -177,8 +178,26 @@ private void makeRequests(ReadCommand readCommand, Iterable replicas) * Perform additional requests if it looks like the original takes "too much time", as defined * by the subclass. * May block while it waits to see if the original requests are answered first. + *

+ * Never (as much as there is such a thing in Java) throws */ - public abstract void maybeTryAdditionalReplicas(); + public final void maybeTryAdditionalReplicas() + { + try + { + doMaybeTryAdditionalReplicas(); + } + catch (UnknownEndpointException ex) + { + logger.debug("Failed to send speculative read retries; The target replica is not known: {}", ex.getMessage()); + } + catch (Throwable ex) + { + logger.warn("Caught exception during speculative read retry; This is unexpected, but we're ignoring it because spec retry is just a best-effort attempt;", ex); + } + } + + protected abstract void doMaybeTryAdditionalReplicas(); /** * send the initial set of requests @@ -269,7 +288,7 @@ public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, this.logFailedSpeculation = logFailedSpeculation; } - public void maybeTryAdditionalReplicas() + protected void doMaybeTryAdditionalReplicas() { if (shouldSpeculateAndMaybeWait() && logFailedSpeculation) { @@ -294,7 +313,7 @@ public SpeculatingReadExecutor(ColumnFamilyStore cfs, super(cfs, command, replicaPlan, replicaPlan.blockFor() < replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime, readTracker); } - public void maybeTryAdditionalReplicas() + protected void doMaybeTryAdditionalReplicas() { if (shouldSpeculateAndMaybeWait()) { @@ -364,7 +383,7 @@ public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs, super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 ? 2 : 1, queryStartNanoTime, readTracker); } - public void maybeTryAdditionalReplicas() + protected void doMaybeTryAdditionalReplicas() { // no-op } diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java index c2957007819d..9fb3f78b1b57 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java @@ -42,6 +42,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.UnknownEndpointException; import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.QueryInfoTracker; @@ -236,6 +237,24 @@ public void testRaceWithNonSpeculativeFailure() } } + /** + * Test that exception thrown during speculation is not propagated so that it doesn't break + * user read + */ + @Test + public void testExceptionInSpeculationAttemptDoesNotPropagate() throws Throwable + { + { + AbstractReadExecutor executor = new ThrowingSpeculatingExecutor(new RuntimeException("arbitrary exception")); + executor.maybeTryAdditionalReplicas(); + } + + { + AbstractReadExecutor executor = new ThrowingSpeculatingExecutor(new UnknownEndpointException("endpoint A.B.C.D is not known")); + executor.maybeTryAdditionalReplicas(); + } + } + public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand { private final long timeout; @@ -264,6 +283,29 @@ public Message createMessage(boolean trackRepairedData) } } + private class ThrowingSpeculatingExecutor extends AbstractReadExecutor + { + private final RuntimeException exception; + + ThrowingSpeculatingExecutor(RuntimeException exception) + { + super(ReadExecutorTest.cfs, + new MockSinglePartitionReadCommand(), + plan(ConsistencyLevel.LOCAL_QUORUM, targets, targets.subList(0, 2)), + 1, + System.nanoTime(), + noopReadTracker()); + + this.exception = exception; + } + + @Override + protected void doMaybeTryAdditionalReplicas() + { + throw exception; + } + } + private ReplicaPlan.ForTokenRead plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel) { return plan(consistencyLevel, targets, targets);