Skip to content

Commit aaa5439

Browse files
Merge pull request #349 from rabbitmq/rabbitmq-java-client-341-no-auto-recovery-when-tcp-window-full
Trigger recovery if a socket write fails
2 parents 30911f4 + fbd707c commit aaa5439

File tree

13 files changed

+519
-41
lines changed

13 files changed

+519
-41
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ public class ConnectionFactory implements Cloneable {
8181
/** The default network recovery interval: 5000 millis */
8282
public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;
8383

84+
/** The default timeout for work pool enqueueing: no timeout */
85+
public static final int DEFAULT_WORK_POOL_TIMEOUT = -1;
86+
8487
private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";
8588

8689
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
@@ -138,6 +141,12 @@ public class ConnectionFactory implements Cloneable {
138141
*/
139142
private boolean channelShouldCheckRpcResponseType = false;
140143

144+
/**
145+
* Timeout in ms for work pool enqueuing.
146+
* @since 4.5.0
147+
*/
148+
private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT;
149+
141150
/** @return the default host to use for connections */
142151
public String getHost() {
143152
return host;
@@ -974,6 +983,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
974983
result.setHeartbeatExecutor(heartbeatExecutor);
975984
result.setChannelRpcTimeout(channelRpcTimeout);
976985
result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType);
986+
result.setWorkPoolTimeout(workPoolTimeout);
977987
return result;
978988
}
979989

@@ -1270,4 +1280,25 @@ public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcRe
12701280
public boolean isChannelShouldCheckRpcResponseType() {
12711281
return channelShouldCheckRpcResponseType;
12721282
}
1283+
1284+
/**
1285+
* Timeout (in ms) for work pool enqueueing.
1286+
* The {@link WorkPool} dispatches several types of responses
1287+
* from the broker (e.g. deliveries). A high-traffic
1288+
* client with slow consumers can exhaust the work pool and
1289+
* compromise the whole connection (by e.g. letting the broker
1290+
* saturate the receive TCP buffers). Setting a timeout
1291+
* would make the connection fail early and avoid hard-to-diagnose
1292+
* TCP connection failure. Note this shouldn't happen
1293+
* with clients that set appropriate QoS values.
1294+
* Default is no timeout.
1295+
* @param workPoolTimeout timeout in ms
1296+
*/
1297+
public void setWorkPoolTimeout(int workPoolTimeout) {
1298+
this.workPoolTimeout = workPoolTimeout;
1299+
}
1300+
1301+
public int getWorkPoolTimeout() {
1302+
return workPoolTimeout;
1303+
}
12731304
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.net.SocketTimeoutException;
3333
import java.util.*;
3434
import java.util.concurrent.*;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3536

3637
final class Copyright {
3738
final static String COPYRIGHT="Copyright (c) 2007-2017 Pivotal Software, Inc.";
@@ -61,6 +62,12 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6162
private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
6263
Collections.synchronizedList(new ArrayList<RecoveryCanBeginListener>());
6364

65+
private final ErrorOnWriteListener errorOnWriteListener;
66+
67+
private final int workPoolTimeout;
68+
69+
private final AtomicBoolean finalShutdownStarted = new AtomicBoolean(false);
70+
6471
/**
6572
* Retrieve a copy of the default table of client properties that
6673
* will be sent to the server during connection startup. This
@@ -245,10 +252,17 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
245252
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
246253

247254
this.metricsCollector = metricsCollector;
255+
256+
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
257+
new ErrorOnWriteListener() {
258+
@Override
259+
public void handle(Connection connection, IOException exception) { }
260+
};
261+
this.workPoolTimeout = params.getWorkPoolTimeout();
248262
}
249263

250264
private void initializeConsumerWorkService() {
251-
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, shutdownTimeout);
265+
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, workPoolTimeout, shutdownTimeout);
252266
}
253267

254268
private void initializeHeartbeatSender() {
@@ -556,7 +570,11 @@ public void writeFrame(Frame f) throws IOException {
556570
* Public API - flush the output buffers
557571
*/
558572
public void flush() throws IOException {
559-
_frameHandler.flush();
573+
try {
574+
_frameHandler.flush();
575+
} catch (IOException ioe) {
576+
this.errorOnWriteListener.handle(this, ioe);
577+
}
560578
}
561579

562580
private static int negotiatedMaxValue(int clientValue, int serverValue) {
@@ -575,15 +593,24 @@ private class MainLoop implements Runnable {
575593
*/
576594
@Override
577595
public void run() {
596+
boolean shouldDoFinalShutdown = true;
578597
try {
579598
while (_running) {
580599
Frame frame = _frameHandler.readFrame();
581600
readFrame(frame);
582601
}
583602
} catch (Throwable ex) {
584-
handleFailure(ex);
603+
if (ex instanceof InterruptedException) {
604+
// loop has been interrupted during shutdown,
605+
// no need to do it again
606+
shouldDoFinalShutdown = false;
607+
} else {
608+
handleFailure(ex);
609+
}
585610
} finally {
586-
doFinalShutdown();
611+
if (shouldDoFinalShutdown) {
612+
doFinalShutdown();
613+
}
587614
}
588615
}
589616
}
@@ -594,6 +621,9 @@ public boolean handleReadFrame(Frame frame) {
594621
try {
595622
readFrame(frame);
596623
return true;
624+
} catch (WorkPoolFullException e) {
625+
// work pool is full, we propagate this one.
626+
throw e;
597627
} catch (Throwable ex) {
598628
try {
599629
handleFailure(ex);
@@ -686,14 +716,33 @@ private void handleFailure(Throwable ex) {
686716

687717
/** private API */
688718
public void doFinalShutdown() {
689-
_frameHandler.close();
690-
_appContinuation.set(null);
691-
notifyListeners();
692-
// assuming that shutdown listeners do not do anything
693-
// asynchronously, e.g. start new threads, this effectively
694-
// guarantees that we only begin recovery when all shutdown
695-
// listeners have executed
696-
notifyRecoveryCanBeginListeners();
719+
if (finalShutdownStarted.compareAndSet(false, true)) {
720+
_frameHandler.close();
721+
_appContinuation.set(null);
722+
closeMainLoopThreadIfNecessary();
723+
notifyListeners();
724+
// assuming that shutdown listeners do not do anything
725+
// asynchronously, e.g. start new threads, this effectively
726+
// guarantees that we only begin recovery when all shutdown
727+
// listeners have executed
728+
notifyRecoveryCanBeginListeners();
729+
}
730+
}
731+
732+
private void closeMainLoopThreadIfNecessary() {
733+
if (mainLoopReadThreadNotNull() && notInMainLoopThread()) {
734+
if (this.mainLoopThread.isAlive()) {
735+
this.mainLoopThread.interrupt();
736+
}
737+
}
738+
}
739+
740+
private boolean notInMainLoopThread() {
741+
return Thread.currentThread() != this.mainLoopThread;
742+
}
743+
744+
private boolean mainLoopReadThreadNotNull() {
745+
return this.mainLoopThread != null;
697746
}
698747

699748
private void notifyRecoveryCanBeginListeners() {
@@ -879,7 +928,6 @@ private ShutdownSignalException startShutdown(Method reason,
879928
_heartbeatSender.shutdown();
880929

881930
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
882-
883931
return sse;
884932
}
885933

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,9 @@ private void releaseChannel() {
387387
if (callback != null) {
388388
try {
389389
this.dispatcher.handleCancel(callback, consumerTag);
390+
} catch (WorkPoolFullException e) {
391+
// couldn't enqueue in work pool, propagating
392+
throw e;
390393
} catch (Throwable ex) {
391394
getConnection().getExceptionHandler().handleConsumerException(this,
392395
ex,
@@ -445,10 +448,13 @@ protected void processDelivery(Command command, Basic.Deliver method) {
445448
// in case a manual ack in the callback, the stats will be able to record the ack
446449
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
447450
this.dispatcher.handleDelivery(callback,
448-
m.getConsumerTag(),
449-
envelope,
450-
(BasicProperties) command.getContentHeader(),
451-
command.getContentBody());
451+
m.getConsumerTag(),
452+
envelope,
453+
(BasicProperties) command.getContentHeader(),
454+
command.getContentBody());
455+
} catch (WorkPoolFullException e) {
456+
// couldn't enqueue in work pool, propagating
457+
throw e;
452458
} catch (Throwable ex) {
453459
getConnection().getExceptionHandler().handleConsumerException(this,
454460
ex,

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class ConnectionParams {
4444
private boolean topologyRecovery;
4545
private int channelRpcTimeout;
4646
private boolean channelShouldCheckRpcResponseType;
47+
private ErrorOnWriteListener errorOnWriteListener;
48+
private int workPoolTimeout = -1;
4749

4850
private ExceptionHandler exceptionHandler;
4951
private ThreadFactory threadFactory;
@@ -213,4 +215,20 @@ public void setChannelRpcTimeout(int channelRpcTimeout) {
213215
public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType) {
214216
this.channelShouldCheckRpcResponseType = channelShouldCheckRpcResponseType;
215217
}
218+
219+
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
220+
this.errorOnWriteListener = errorOnWriteListener;
221+
}
222+
223+
public ErrorOnWriteListener getErrorOnWriteListener() {
224+
return errorOnWriteListener;
225+
}
226+
227+
public void setWorkPoolTimeout(int workPoolTimeout) {
228+
this.workPoolTimeout = workPoolTimeout;
229+
}
230+
231+
public int getWorkPoolTimeout() {
232+
return workPoolTimeout;
233+
}
216234
}

src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ final public class ConsumerWorkService {
3131
private final WorkPool<Channel, Runnable> workPool;
3232
private final int shutdownTimeout;
3333

34-
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
34+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
3535
this.privateExecutor = (executor == null);
3636
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
37-
: executor;
38-
this.workPool = new WorkPool<Channel, Runnable>();
37+
: executor;
38+
this.workPool = new WorkPool<Channel, Runnable>(queueingTimeout);
3939
this.shutdownTimeout = shutdownTimeout;
4040
}
4141

42+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
43+
this(executor, threadFactory, -1, shutdownTimeout);
44+
}
45+
4246
public int getShutdownTimeout() {
4347
return shutdownTimeout;
4448
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) 2018-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.impl;
17+
18+
import com.rabbitmq.client.Connection;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* Listener called when a connection gets an error trying to write on the socket.
24+
* This can be used to trigger connection recovery.
25+
*/
26+
public interface ErrorOnWriteListener {
27+
28+
/**
29+
* Called when writing to the socket failed
30+
* @param connection the owning connection instance
31+
* @param exception the thrown exception
32+
*/
33+
void handle(Connection connection, IOException exception) throws IOException;
34+
35+
}

src/main/java/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import java.util.Iterator;
2222
import java.util.Map;
2323
import java.util.Set;
24+
import java.util.concurrent.BlockingQueue;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.TimeUnit;
2427

2528
/**
2629
* <p>This is a generic implementation of the channels specification
@@ -61,6 +64,37 @@ public class WorkPool<K, W> {
6164
private final Map<K, VariableLinkedBlockingQueue<W>> pool = new HashMap<K, VariableLinkedBlockingQueue<W>>();
6265
/** Those keys which want limits to be removed. We do not limit queue size if this is non-empty. */
6366
private final Set<K> unlimited = new HashSet<K>();
67+
private final EnqueueingCallback<W> enqueueingCallback;
68+
69+
public WorkPool(final int queueingTimeout) {
70+
if (queueingTimeout > 0) {
71+
this.enqueueingCallback = new EnqueueingCallback<W>() {
72+
@Override
73+
public void enqueue(BlockingQueue<W> queue, W item) {
74+
try {
75+
boolean offered = queue.offer(item, queueingTimeout, TimeUnit.MILLISECONDS);
76+
if (!offered) {
77+
throw new WorkPoolFullException("Could not enqueue in work pool after " + queueingTimeout + " ms.");
78+
}
79+
} catch (InterruptedException e) {
80+
Thread.currentThread();
81+
}
82+
}
83+
};
84+
} else {
85+
this.enqueueingCallback = new EnqueueingCallback<W>() {
86+
87+
@Override
88+
public void enqueue(BlockingQueue<W> queue, W item) {
89+
try {
90+
queue.put(item);
91+
} catch (InterruptedException e) {
92+
Thread.currentThread().interrupt();
93+
}
94+
}
95+
};
96+
}
97+
}
6498

6599
/**
66100
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -178,11 +212,7 @@ public boolean addWorkItem(K key, W item) {
178212
}
179213
// The put operation may block. We need to make sure we are not holding the lock while that happens.
180214
if (queue != null) {
181-
try {
182-
queue.put(item);
183-
} catch (InterruptedException e) {
184-
Thread.currentThread().interrupt();
185-
}
215+
enqueueingCallback.enqueue(queue, item);
186216

187217
synchronized (this) {
188218
if (isDormant(key)) {
@@ -243,4 +273,10 @@ private K readyToInProgress() {
243273
}
244274
return key;
245275
}
276+
277+
private interface EnqueueingCallback<W> {
278+
279+
void enqueue(BlockingQueue<W> queue, W item);
280+
281+
}
246282
}

0 commit comments

Comments
 (0)