-
Notifications
You must be signed in to change notification settings - Fork 57
RPCs are executed one at a time, blocking on a mutex #174
Description
Summary
Calling Sender::unbind does wait a mutex release if a RPC is currently executed on the current channel, and waits this RPC to complete before submitting a new one.
This leads to a blocking behaviour upon "massive" unbinds.
Expected Behavior
I would expect to not get such blocking behaviours in a reactor-* project and would expect a reactive driver not to force me to put subscribeOn(boundedElastic()) everywhere.
At the very least we could have a separate method in ChannelPool to get a channel not executing RPCs thus enabling smart implementation to get channels always in a state to directly submit RPCs, open new channels if needed or waits without blocking threads if relevant.
Actual Behavior
See this flame graph taken on the Netty event loop:
Context: a CTRL+C in a perf test lead to 10.000 IMAP connections being closed at the same time, cleaning up
Steps to Reproduce
@Test
void blockingRPCs(DockerRabbitMQ rabbitMQ) throws Exception {
BlockHound.install();
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.resourceManagementScheduler(Schedulers.boundedElastic());
final Sender sender = new Sender(senderOptions);
sender.declare(QueueSpecification.queue().name("queue")).block();
sender.declare(ExchangeSpecification.exchange().name("ex")).block();
Flux.range(0, 100)
.parallel()
.flatMap(i -> {
return sender.bind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("" + i))
.then(sender.unbind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("routing" + i)));
})
.then()
.subscribeOn(Schedulers.parallel())
.block();
}Fails with
reactor.core.Exceptions$ReactiveException: reactor.blockhound.BlockingOperationError: Blocking call! java.net.SocketOutputStream#socketWrite0
[...]
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.net.SocketOutputStream#socketWrite0
at java.base/java.net.SocketOutputStream.socketWrite0(SocketOutputStream.java)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
at java.base/java.io.DataOutputStream.flush(DataOutputStream.java:123)
at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:197)
at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:636)
at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:134)
at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:455)
at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:434)
at com.rabbitmq.client.impl.AMQChannel.quiescingAsyncRpc(AMQChannel.java:369)
at com.rabbitmq.client.impl.AMQChannel.asyncRpc(AMQChannel.java:360)
at com.rabbitmq.client.impl.AMQChannel.privateAsyncRpc(AMQChannel.java:320)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingAsyncRpc(AMQChannel.java:155)
at com.rabbitmq.client.impl.ChannelN.asyncCompletableRpc(ChannelN.java:1580)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.asyncCompletableRpc(AutorecoveringChannel.java:931)
at reactor.rabbitmq.ChannelProxy.asyncCompletableRpc(ChannelProxy.java:556)
at reactor.rabbitmq.Sender.lambda$bind$28(Sender.java:691)
[...]
Your Environment
- Reactor version(s) used: 2020.0.18
- JAVA 11
- Ubuntu 20.04
