3838import com .couchbase .client .core .deps .io .netty .channel .ChannelDuplexHandler ;
3939import com .couchbase .client .core .deps .io .netty .channel .ChannelHandlerContext ;
4040import com .couchbase .client .core .deps .io .netty .channel .ChannelPromise ;
41+ import com .couchbase .client .core .deps .io .netty .handler .timeout .IdleState ;
42+ import com .couchbase .client .core .deps .io .netty .handler .timeout .IdleStateEvent ;
43+ import com .couchbase .client .core .deps .io .netty .handler .timeout .IdleStateHandler ;
4144import com .couchbase .client .core .deps .io .netty .util .ReferenceCountUtil ;
4245import com .couchbase .client .core .deps .io .netty .util .collection .IntObjectHashMap ;
4346import com .couchbase .client .core .deps .io .netty .util .collection .IntObjectMap ;
4447import com .couchbase .client .core .endpoint .BaseEndpoint ;
4548import com .couchbase .client .core .endpoint .EndpointContext ;
4649import com .couchbase .client .core .env .CompressionConfig ;
50+ import com .couchbase .client .core .env .IoConfig ;
4751import com .couchbase .client .core .error .CollectionNotFoundException ;
4852import com .couchbase .client .core .error .DecodingFailureException ;
4953import com .couchbase .client .core .error .FeatureNotAvailableException ;
5660import com .couchbase .client .core .msg .Response ;
5761import com .couchbase .client .core .msg .ResponseStatus ;
5862import com .couchbase .client .core .msg .kv .KeyValueRequest ;
63+ import com .couchbase .client .core .msg .kv .NoopRequest ;
5964import com .couchbase .client .core .msg .kv .RangeScanContinueRequest ;
6065import com .couchbase .client .core .msg .kv .RangeScanContinueResponse ;
6166import com .couchbase .client .core .msg .kv .UnlockRequest ;
67+ import com .couchbase .client .core .retry .FailFastRetryStrategy ;
6268import com .couchbase .client .core .retry .RetryOrchestrator ;
6369import com .couchbase .client .core .retry .RetryReason ;
6470import com .couchbase .client .core .service .ServiceType ;
7076import java .util .Map ;
7177import java .util .Optional ;
7278import java .util .Set ;
79+ import java .util .concurrent .TimeUnit ;
7380import java .util .stream .Collectors ;
7481
7582import static com .couchbase .client .core .deps .io .netty .buffer .Unpooled .EMPTY_BUFFER ;
@@ -175,6 +182,71 @@ public KeyValueMessageHandler(final BaseEndpoint endpoint, final EndpointContext
175182 this .isInternalTracer = CbTracing .isInternalTracer (endpointContext .coreResources ().requestTracer ());
176183 }
177184
185+ private IoConfig io () {
186+ return endpointContext .environment ().ioConfig ();
187+ }
188+
189+ private void addIdleStateHandler (ChannelHandlerContext ctx ) {
190+ long readerIdleNanos = io ().configIdleRedialTimeout ().toNanos () / 2 ;
191+ ctx .pipeline ().addFirst (new IdleStateHandler (readerIdleNanos , 0 , 0 , TimeUnit .NANOSECONDS ));
192+ }
193+
194+ @ Override
195+ public void userEventTriggered (ChannelHandlerContext ctx , Object evt ) throws Exception {
196+ if (!(evt instanceof IdleStateEvent )) {
197+ super .userEventTriggered (ctx , evt );
198+ return ;
199+ }
200+
201+ IdleStateEvent e = (IdleStateEvent ) evt ;
202+ if (e .state () != IdleState .READER_IDLE ) {
203+ log .warn ("Unexpected idle state: {} ; {}" , e .state (), endpointContext );
204+ return ;
205+ }
206+
207+ if (e .isFirst ()) {
208+ writeNoop (ctx );
209+
210+ } else {
211+ log .warn (
212+ "Closing KV connection because there was no incoming traffic for {} (io.configIdleRedialTimeout); {}" ,
213+ io ().configIdleRedialTimeout (),
214+ endpointContext
215+ );
216+ ctx .close ();
217+ }
218+ }
219+
220+ private void writeNoop (ChannelHandlerContext ctx ) {
221+ log .debug ("Writing NOOP; {}" , endpointContext );
222+
223+ // Timeout and retry strategy are handled by higher layers,
224+ // and have no effect when the request is written directly
225+ // to the channel like this. That's okay, because they're
226+ // not needed here; we're just trying to keep IdleStateHandler
227+ // from firing a read timeout event for a healthy connection.
228+ NoopRequest req = new NoopRequest (
229+ io ().configIdleRedialTimeout (), // not actually enforced
230+ endpointContext .core ().context (),
231+ FailFastRetryStrategy .INSTANCE , // not actually enforced
232+ null
233+ );
234+
235+ ChannelPromise noopWritePromise = ctx .newPromise ().addListener (future -> {
236+ if (!future .isSuccess ()) {
237+ // Channel is saturated or already disconnected. That's okay.
238+ log .debug ("Failed to write NOOP; {}" , endpointContext , future .cause ());
239+ } else {
240+ log .debug ("Wrote NOOP; {}" , endpointContext );
241+ }
242+ });
243+
244+ write (ctx , req , noopWritePromise );
245+ ctx .flush ();
246+
247+ req .response ().thenAccept (res -> log .debug ("Read NOOP response; {}" , endpointContext ));
248+ }
249+
178250 /**
179251 * Actions to be performed when the channel becomes active.
180252 *
@@ -186,6 +258,11 @@ public KeyValueMessageHandler(final BaseEndpoint endpoint, final EndpointContext
186258 */
187259 @ Override
188260 public void channelActive (final ChannelHandlerContext ctx ) {
261+ // Defer adding the idle state handler until the channel is active, because we don't want it to run
262+ // until the connection handshake is complete and the KV handler is ready to dispatch requests.
263+ // Prior to this point, connection timeouts are handled at a higher layer.
264+ addIdleStateHandler (ctx );
265+
189266 ioContext = new IoContext (
190267 endpointContext ,
191268 ctx .channel ().localAddress (),
0 commit comments