Skip to content

Commit a166649

Browse files
committed
CNDB-14624: do not fail user read when speculative retry handling throws
1 parent f23b0bf commit a166649

File tree

4 files changed

+59
-9
lines changed

4 files changed

+59
-9
lines changed

src/java/org/apache/cassandra/net/OutboundConnection.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
@SuppressWarnings({ "WeakerAccess", "FieldMayBeFinal", "NonAtomicOperationOnVolatileField", "SameParameterValue" })
9595
public class OutboundConnection
9696
{
97-
static final Logger logger = LoggerFactory.getLogger(OutboundConnection.class);
97+
private static final Logger logger = LoggerFactory.getLogger(OutboundConnection.class);
9898
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 30L, TimeUnit.SECONDS);
9999

100100
private static final AtomicLongFieldUpdater<OutboundConnection> submittedUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "submittedCount");
@@ -107,6 +107,8 @@ public class OutboundConnection
107107
private final EventLoop eventLoop;
108108
private final Delivery delivery;
109109

110+
private final String id;
111+
110112
private final OutboundMessageCallbacks callbacks;
111113
private final OutboundDebugCallbacks debug;
112114
@VisibleForTesting
@@ -313,6 +315,7 @@ void cancel()
313315
? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor)
314316
: new EventLoopDelivery();
315317
setDisconnected();
318+
this.id = computeId();
316319
}
317320

318321
/**
@@ -485,7 +488,7 @@ private boolean onExpired(Message<?> message)
485488
*/
486489
private void onFailedSerialize(Message<?> message, int messagingVersion, int bytesWrittenToNetwork, Throwable t)
487490
{
488-
logger.warn("{} dropping message of type {} due to error", id(), message.verb(), t);
491+
noSpamLogger.warn("{} dropping message of type {} due to error", id(), message.verb(), t);
489492
JVMStabilityInspector.inspectThrowable(t);
490493
releaseCapacity(1, canonicalSize(message));
491494
errorCount += 1;
@@ -949,7 +952,7 @@ public void run()
949952
try
950953
{
951954
priorThreadName = Thread.currentThread().getName();
952-
threadName = "Messaging-OUT-" + template.from() + "->" + template.to + '-' + type;
955+
threadName = "Messaging-OUT-" + template.from + "->" + template.to + '-' + type;
953956
Thread.currentThread().setName(threadName);
954957

955958
super.run();
@@ -1089,21 +1092,23 @@ class Initiate
10891092
void onFailure(Throwable cause)
10901093
{
10911094
if (cause instanceof ConnectException)
1092-
noSpamLogger.info("{} failed to connect", id(), cause);
1095+
logger.info("{} failed to connect", id(), cause);
10931096
else
1094-
noSpamLogger.error("{} failed to connect", id(), cause);
1097+
logger.error("{} failed to connect", id(), cause);
10951098

10961099
JVMStabilityInspector.inspectThrowable(cause);
10971100

10981101
if (hasPending())
10991102
{
1103+
logger.info("{} scheduling reconnect to {}", id(), settings.connectToId());
11001104
Promise<Result<MessagingSuccess>> result = new AsyncPromise<>(eventLoop);
11011105
state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis), MILLISECONDS));
11021106
retryRateMillis = min(1000, retryRateMillis * 2);
11031107
}
11041108
else
11051109
{
11061110
// this Initiate will be discarded
1111+
logger.info("{} no more pending messages; not retrying connection to {}", id(), settings.connectToId());
11071112
state = Disconnected.dormant(state.disconnected().maintenance);
11081113
}
11091114
}
@@ -1143,7 +1148,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
11431148
}
11441149
catch (Throwable t)
11451150
{
1146-
logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);
1151+
noSpamLogger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);
11471152
}
11481153
}
11491154
});
@@ -1191,6 +1196,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
11911196
private void attempt(Promise<Result<MessagingSuccess>> result)
11921197
{
11931198
++connectionAttempts;
1199+
logger.info("{} attempting to connect to {}; attempt {}", id(), template.connectToId(), connectionAttempts);
11941200

11951201
/*
11961202
* Re-evaluate messagingVersion before re-attempting the connection in case
@@ -1601,6 +1607,11 @@ private String id(boolean includeReal)
16011607
}
16021608

16031609
private String id()
1610+
{
1611+
return id;
1612+
}
1613+
1614+
private String computeId()
16041615
{
16051616
State state = this.state;
16061617
Channel channel = null;

src/java/org/apache/cassandra/net/OutboundConnectionSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ public boolean withEncryption()
172172

173173
public String toString()
174174
{
175-
return String.format("peer: (%s, %s), framing: %s, encryption: %s",
176-
to, connectTo, framing, SocketFactory.encryptionOptionsSummary(encryption));
175+
return String.format("from: %s peer: (%s, %s), framing: %s, encryption: %s",
176+
from, to, connectTo, framing, SocketFactory.encryptionOptionsSummary(encryption));
177177
}
178178

179179
public OutboundConnectionSettings withAuthenticator(IInternodeAuthenticator authenticator)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.net;
20+
21+
public class UnknownEndpointException extends RuntimeException
22+
{
23+
UnknownEndpointException(String message)
24+
{
25+
super(message);
26+
}
27+
}

src/java/org/apache/cassandra/service/StorageProxy.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.apache.cassandra.net.MessageFlag;
118118
import org.apache.cassandra.net.MessagingService;
119119
import org.apache.cassandra.net.RequestCallback;
120+
import org.apache.cassandra.net.UnknownEndpointException;
120121
import org.apache.cassandra.net.Verb;
121122
import org.apache.cassandra.schema.Schema;
122123
import org.apache.cassandra.schema.SchemaConstants;
@@ -2239,7 +2240,18 @@ private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> comm
22392240
// set of replicas we sent messages to, speculatively send an additional messages to an un-contacted replica
22402241
for (int i=0; i<cmdCount; i++)
22412242
{
2242-
reads[i].maybeTryAdditionalReplicas();
2243+
try
2244+
{
2245+
reads[i].maybeTryAdditionalReplicas();
2246+
}
2247+
catch (UnknownEndpointException ex)
2248+
{
2249+
logger.debug("Failed to send speculative read retries; The target replica is not known: {}", ex.getMessage());
2250+
}
2251+
catch (Exception ex)
2252+
{
2253+
logger.warn("Caught exception during speculative read retry; This is unexpected, but we're ignoring it because spec retry is just a best-effort attempt;", ex);
2254+
}
22432255
}
22442256

22452257
// wait for enough responses to meet the consistency level. If there's a digest mismatch, begin the read

0 commit comments

Comments
 (0)