@@ -70,12 +70,14 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory
7070 private final Function <String , SslContext > sslContextFactory ;
7171 private final Consumer <Channel > channelCustomizer ;
7272 private final Consumer <Bootstrap > bootstrapCustomizer ;
73+ private final Duration enqueuingTimeout ;
7374
7475 public NettyFrameHandlerFactory (
7576 EventLoopGroup eventLoopGroup ,
7677 Consumer <Channel > channelCustomizer ,
7778 Consumer <Bootstrap > bootstrapCustomizer ,
7879 Function <String , SslContext > sslContextFactory ,
80+ Duration enqueuingTimeout ,
7981 int connectionTimeout ,
8082 SocketConfigurator configurator ,
8183 int maxInboundMessageBodySize ) {
@@ -85,6 +87,7 @@ public NettyFrameHandlerFactory(
8587 this .channelCustomizer = channelCustomizer == null ? Utils .noOpConsumer () : channelCustomizer ;
8688 this .bootstrapCustomizer =
8789 bootstrapCustomizer == null ? Utils .noOpConsumer () : bootstrapCustomizer ;
90+ this .enqueuingTimeout = enqueuingTimeout ;
8891 }
8992
9093 private static void closeNettyState (Channel channel , EventLoopGroup eventLoopGroup ) {
@@ -127,6 +130,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
127130 addr ,
128131 sslContext ,
129132 this .eventLoopGroup ,
133+ this .enqueuingTimeout ,
130134 this .channelCustomizer ,
131135 this .bootstrapCustomizer );
132136 }
@@ -146,6 +150,7 @@ private static final class NettyFrameHandler implements FrameHandler {
146150 'A' , 'M' , 'Q' , 'P' , 0 , AMQP .PROTOCOL .MAJOR , AMQP .PROTOCOL .MINOR , AMQP .PROTOCOL .REVISION
147151 };
148152 private final EventLoopGroup eventLoopGroup ;
153+ private final Duration enqueuingTimeout ;
149154 private final Channel channel ;
150155 private final AmqpHandler handler ;
151156 private final AtomicBoolean closed = new AtomicBoolean (false );
@@ -155,9 +160,11 @@ private NettyFrameHandler(
155160 Address addr ,
156161 SslContext sslContext ,
157162 EventLoopGroup elg ,
163+ Duration enqueuingTimeout ,
158164 Consumer <Channel > channelCustomizer ,
159165 Consumer <Bootstrap > bootstrapCustomizer )
160166 throws IOException {
167+ this .enqueuingTimeout = enqueuingTimeout ;
161168 Bootstrap b = new Bootstrap ();
162169 bootstrapCustomizer .accept (b );
163170 if (b .config ().group () == null ) {
@@ -310,7 +317,8 @@ public void writeFrame(Frame frame) throws IOException {
310317 this .doWriteFrame (frame );
311318 } else {
312319 try {
313- boolean canWriteNow = this .handler .writableLatch ().await (10 , SECONDS );
320+ boolean canWriteNow =
321+ this .handler .writableLatch ().await (enqueuingTimeout .toMillis (), MILLISECONDS );
314322 if (canWriteNow ) {
315323 this .doWriteFrame (frame );
316324 } else {
0 commit comments