Skip to content

CNDB-14624: do not fail user read when speculative retry handling throws #1875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/java/org/apache/cassandra/net/OutboundConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
@SuppressWarnings({ "WeakerAccess", "FieldMayBeFinal", "NonAtomicOperationOnVolatileField", "SameParameterValue" })
public class OutboundConnection
{
static final Logger logger = LoggerFactory.getLogger(OutboundConnection.class);
private static final Logger logger = LoggerFactory.getLogger(OutboundConnection.class);
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 30L, TimeUnit.SECONDS);

private static final AtomicLongFieldUpdater<OutboundConnection> submittedUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "submittedCount");
Expand Down Expand Up @@ -485,7 +485,7 @@ private boolean onExpired(Message<?> message)
*/
private void onFailedSerialize(Message<?> message, int messagingVersion, int bytesWrittenToNetwork, Throwable t)
{
logger.warn("{} dropping message of type {} due to error", id(), message.verb(), t);
noSpamLogger.warn("{} dropping message of type {} due to error", id(), message.verb(), t);
JVMStabilityInspector.inspectThrowable(t);
releaseCapacity(1, canonicalSize(message));
errorCount += 1;
Expand Down Expand Up @@ -949,7 +949,7 @@ public void run()
try
{
priorThreadName = Thread.currentThread().getName();
threadName = "Messaging-OUT-" + template.from() + "->" + template.to + '-' + type;
threadName = "Messaging-OUT-" + template.from + "->" + template.to + '-' + type;
Thread.currentThread().setName(threadName);

super.run();
Expand Down Expand Up @@ -1089,21 +1089,23 @@ class Initiate
void onFailure(Throwable cause)
{
if (cause instanceof ConnectException)
noSpamLogger.info("{} failed to connect", id(), cause);
logger.info("{} failed to connect", id(), cause);
else
noSpamLogger.error("{} failed to connect", id(), cause);
logger.error("{} failed to connect", id(), cause);

JVMStabilityInspector.inspectThrowable(cause);

if (hasPending())
{
logger.info("{} scheduling reconnect to {}", id(), settings.connectToId());
Promise<Result<MessagingSuccess>> result = new AsyncPromise<>(eventLoop);
state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis), MILLISECONDS));
retryRateMillis = min(1000, retryRateMillis * 2);
}
else
{
// this Initiate will be discarded
logger.info("{} no more pending messages; not retrying connection to {}", id(), settings.connectToId());
state = Disconnected.dormant(state.disconnected().maintenance);
}
}
Expand Down Expand Up @@ -1143,7 +1145,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
}
catch (Throwable t)
{
logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);
noSpamLogger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);
}
}
});
Expand Down Expand Up @@ -1191,6 +1193,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
private void attempt(Promise<Result<MessagingSuccess>> result)
{
++connectionAttempts;
logger.info("{} attempting to connect to {}; attempt {}", id(), template.connectToId(), connectionAttempts);

/*
* Re-evaluate messagingVersion before re-attempting the connection in case
Expand Down Expand Up @@ -1611,7 +1614,7 @@ private String id()
settings = state.established().settings;
}
String channelId = channel != null ? channel.id().asShortText() : "[no-channel]";
return SocketFactory.channelId(settings.from(), settings.to, type, channelId);
return SocketFactory.channelId(settings.from, settings.to, type, channelId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public boolean withEncryption()

public String toString()
{
return String.format("peer: (%s, %s), framing: %s, encryption: %s",
to, connectTo, framing, SocketFactory.encryptionOptionsSummary(encryption));
return String.format("from: %s peer: (%s, %s), framing: %s, encryption: %s",
from, to, connectTo, framing, SocketFactory.encryptionOptionsSummary(encryption));
}

public OutboundConnectionSettings withAuthenticator(IInternodeAuthenticator authenticator)
Expand Down
58 changes: 58 additions & 0 deletions src/java/org/apache/cassandra/net/UnknownEndpointException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.net;

/**
* Exception thrown when the messaging service encounters an endpoint that cannot be resolved or is not known
* to the cluster. This typically occurs during network operations when attempting to communicate with a node
* that is not recognized by the current node's topology information.
*
* <p>This exception is thrown when the
* {@link org.apache.cassandra.locator.IEndpointSnitch#getPreferredAddress(org.apache.cassandra.locator.InetAddressAndPort)}
* method encounters an endpoint that cannot be resolved to a preferred address.</p>
*
* <p>Common scenarios where this exception may be thrown include:</p>
*
* <ul>
* <li>During speculative read retries when the target replica endpoint is not known to the messaging service</li>
* <li>When attempting to establish connections to endpoints that are not part of the known cluster topology</li>
* </ul>
*
* <p>The exception is used internally by the messaging system to handle cases where endpoint resolution fails,
* allowing the system to gracefully handle unknown endpoints rather than causing fatal errors. For example,
* during speculative read operations, this exception is caught and logged as a debug message, allowing the
* operation to continue without the failed speculative retry.
*
* @see org.apache.cassandra.locator.IEndpointSnitch#getPreferredAddress(org.apache.cassandra.locator.InetAddressAndPort)
* @see org.apache.cassandra.service.reads.AbstractReadExecutor#maybeTryAdditionalReplicas()
*/
public class UnknownEndpointException extends RuntimeException
{
/**
* Constructs a new UnknownEndpointException with the specified detail message.
*
* @param message the detail message explaining why the endpoint is unknown or cannot be resolved.
* This message should provide context about which endpoint failed and the circumstances
* of the failure (e.g., "Failed to resolve preferred address for endpoint 192.168.1.100:9042")
*/
public UnknownEndpointException(String message)
{
super(message);
}
}
1 change: 0 additions & 1 deletion src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.cassandra.metrics.ReadCoordinationMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.UnknownEndpointException;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.service.reads.repair.ReadRepair;
Expand Down Expand Up @@ -177,8 +178,26 @@ private void makeRequests(ReadCommand readCommand, Iterable<Replica> replicas)
* Perform additional requests if it looks like the original takes "too much time", as defined
* by the subclass.
* May block while it waits to see if the original requests are answered first.
* <p/>
* Never (as much as there is such a thing in Java) throws
*/
public abstract void maybeTryAdditionalReplicas();
public final void maybeTryAdditionalReplicas()
{
try
{
doMaybeTryAdditionalReplicas();
}
catch (UnknownEndpointException ex)
{
logger.debug("Failed to send speculative read retries; The target replica is not known: {}", ex.getMessage());
}
catch (Throwable ex)
{
logger.warn("Caught exception during speculative read retry; This is unexpected, but we're ignoring it because spec retry is just a best-effort attempt;", ex);
}
}

protected abstract void doMaybeTryAdditionalReplicas();

/**
* send the initial set of requests
Expand Down Expand Up @@ -269,7 +288,7 @@ public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command,
this.logFailedSpeculation = logFailedSpeculation;
}

public void maybeTryAdditionalReplicas()
protected void doMaybeTryAdditionalReplicas()
{
if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
{
Expand All @@ -294,7 +313,7 @@ public SpeculatingReadExecutor(ColumnFamilyStore cfs,
super(cfs, command, replicaPlan, replicaPlan.blockFor() < replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime, readTracker);
}

public void maybeTryAdditionalReplicas()
protected void doMaybeTryAdditionalReplicas()
{
if (shouldSpeculateAndMaybeWait())
{
Expand Down Expand Up @@ -364,7 +383,7 @@ public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 ? 2 : 1, queryStartNanoTime, readTracker);
}

public void maybeTryAdditionalReplicas()
protected void doMaybeTryAdditionalReplicas()
{
// no-op
}
Expand Down
42 changes: 42 additions & 0 deletions test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.UnknownEndpointException;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.QueryInfoTracker;
Expand Down Expand Up @@ -236,6 +237,24 @@ public void testRaceWithNonSpeculativeFailure()
}
}

/**
* Test that exception thrown during speculation is not propagated so that it doesn't break
* user read
*/
@Test
public void testExceptionInSpeculationAttemptDoesNotPropagate() throws Throwable
{
{
AbstractReadExecutor executor = new ThrowingSpeculatingExecutor(new RuntimeException("arbitrary exception"));
executor.maybeTryAdditionalReplicas();
}

{
AbstractReadExecutor executor = new ThrowingSpeculatingExecutor(new UnknownEndpointException("endpoint A.B.C.D is not known"));
executor.maybeTryAdditionalReplicas();
}
}

public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand
{
private final long timeout;
Expand Down Expand Up @@ -264,6 +283,29 @@ public Message createMessage(boolean trackRepairedData)
}
}

private class ThrowingSpeculatingExecutor extends AbstractReadExecutor
{
private final RuntimeException exception;

ThrowingSpeculatingExecutor(RuntimeException exception)
{
super(ReadExecutorTest.cfs,
new MockSinglePartitionReadCommand(),
plan(ConsistencyLevel.LOCAL_QUORUM, targets, targets.subList(0, 2)),
1,
System.nanoTime(),
noopReadTracker());

this.exception = exception;
}

@Override
protected void doMaybeTryAdditionalReplicas()
{
throw exception;
}
}

private ReplicaPlan.ForTokenRead plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel)
{
return plan(consistencyLevel, targets, targets);
Expand Down