@@ -272,98 +272,121 @@ ChannelInitializer<Channel> initializer(
272272 DriverChannelOptions options ,
273273 NodeMetricUpdater nodeMetricUpdater ,
274274 CompletableFuture <DriverChannel > resultFuture ) {
275- return new ChannelInitializer <Channel >() {
276- @ Override
277- protected void initChannel (Channel channel ) {
278- try {
279- DriverExecutionProfile defaultConfig = context .getConfig ().getDefaultProfile ();
280-
281- long setKeyspaceTimeoutMillis =
282- defaultConfig
283- .getDuration (DefaultDriverOption .CONNECTION_SET_KEYSPACE_TIMEOUT )
284- .toMillis ();
285- int maxFrameLength =
286- (int ) defaultConfig .getBytes (DefaultDriverOption .PROTOCOL_MAX_FRAME_LENGTH );
287- int maxRequestsPerConnection =
288- defaultConfig .getInt (DefaultDriverOption .CONNECTION_MAX_REQUESTS );
289- int maxOrphanRequests =
290- defaultConfig .getInt (DefaultDriverOption .CONNECTION_MAX_ORPHAN_REQUESTS );
291- if (maxOrphanRequests >= maxRequestsPerConnection ) {
292- if (LOGGED_ORPHAN_WARNING .compareAndSet (false , true )) {
293- LOG .warn (
294- "[{}] Invalid value for {}: {}. It must be lower than {}. "
295- + "Defaulting to {} (1/4 of max-requests) instead." ,
296- logPrefix ,
297- DefaultDriverOption .CONNECTION_MAX_ORPHAN_REQUESTS .getPath (),
298- maxOrphanRequests ,
299- DefaultDriverOption .CONNECTION_MAX_REQUESTS .getPath (),
300- maxRequestsPerConnection / 4 );
301- }
302- maxOrphanRequests = maxRequestsPerConnection / 4 ;
303- }
275+ return new ChannelFactoryInitializer (
276+ endPoint , protocolVersion , options , nodeMetricUpdater , resultFuture );
277+ };
278+
279+ class ChannelFactoryInitializer extends ChannelInitializer <Channel > {
280+
281+ private final EndPoint endPoint ;
282+ private final ProtocolVersion protocolVersion ;
283+ private final DriverChannelOptions options ;
284+ private final NodeMetricUpdater nodeMetricUpdater ;
285+ private final CompletableFuture <DriverChannel > resultFuture ;
286+
287+ ChannelFactoryInitializer (
288+ EndPoint endPoint ,
289+ ProtocolVersion protocolVersion ,
290+ DriverChannelOptions options ,
291+ NodeMetricUpdater nodeMetricUpdater ,
292+ CompletableFuture <DriverChannel > resultFuture ) {
293+
294+ this .endPoint = endPoint ;
295+ this .protocolVersion = protocolVersion ;
296+ this .options = options ;
297+ this .nodeMetricUpdater = nodeMetricUpdater ;
298+ this .resultFuture = resultFuture ;
299+ }
304300
305- InFlightHandler inFlightHandler =
306- new InFlightHandler (
307- protocolVersion ,
308- new StreamIdGenerator (maxRequestsPerConnection ),
309- maxOrphanRequests ,
310- setKeyspaceTimeoutMillis ,
311- channel .newPromise (),
312- options .eventCallback ,
313- options .ownerLogPrefix );
314- HeartbeatHandler heartbeatHandler = new HeartbeatHandler (defaultConfig );
315- ProtocolInitHandler initHandler =
316- new ProtocolInitHandler (
317- context ,
318- protocolVersion ,
319- clusterName ,
320- endPoint ,
321- options ,
322- heartbeatHandler ,
323- productType == null );
324-
325- ChannelPipeline pipeline = channel .pipeline ();
326- context
327- .getSslHandlerFactory ()
328- .map (f -> f .newSslHandler (channel , endPoint ))
329- .map (h -> pipeline .addLast (SSL_HANDLER_NAME , h ));
330-
331- // Only add meter handlers on the pipeline if metrics are enabled.
332- SessionMetricUpdater sessionMetricUpdater =
333- context .getMetricsFactory ().getSessionUpdater ();
334- if (nodeMetricUpdater .isEnabled (DefaultNodeMetric .BYTES_RECEIVED , null )
335- || sessionMetricUpdater .isEnabled (DefaultSessionMetric .BYTES_RECEIVED , null )) {
336- pipeline .addLast (
337- INBOUND_TRAFFIC_METER_NAME ,
338- new InboundTrafficMeter (nodeMetricUpdater , sessionMetricUpdater ));
301+ @ Override
302+ protected void initChannel (Channel channel ) {
303+ try {
304+ DriverExecutionProfile defaultConfig = context .getConfig ().getDefaultProfile ();
305+
306+ long setKeyspaceTimeoutMillis =
307+ defaultConfig
308+ .getDuration (DefaultDriverOption .CONNECTION_SET_KEYSPACE_TIMEOUT )
309+ .toMillis ();
310+ int maxFrameLength =
311+ (int ) defaultConfig .getBytes (DefaultDriverOption .PROTOCOL_MAX_FRAME_LENGTH );
312+ int maxRequestsPerConnection =
313+ defaultConfig .getInt (DefaultDriverOption .CONNECTION_MAX_REQUESTS );
314+ int maxOrphanRequests =
315+ defaultConfig .getInt (DefaultDriverOption .CONNECTION_MAX_ORPHAN_REQUESTS );
316+ if (maxOrphanRequests >= maxRequestsPerConnection ) {
317+ if (LOGGED_ORPHAN_WARNING .compareAndSet (false , true )) {
318+ LOG .warn (
319+ "[{}] Invalid value for {}: {}. It must be lower than {}. "
320+ + "Defaulting to {} (1/4 of max-requests) instead." ,
321+ logPrefix ,
322+ DefaultDriverOption .CONNECTION_MAX_ORPHAN_REQUESTS .getPath (),
323+ maxOrphanRequests ,
324+ DefaultDriverOption .CONNECTION_MAX_REQUESTS .getPath (),
325+ maxRequestsPerConnection / 4 );
339326 }
327+ maxOrphanRequests = maxRequestsPerConnection / 4 ;
328+ }
340329
341- if (nodeMetricUpdater .isEnabled (DefaultNodeMetric .BYTES_SENT , null )
342- || sessionMetricUpdater .isEnabled (DefaultSessionMetric .BYTES_SENT , null )) {
343- pipeline .addLast (
344- OUTBOUND_TRAFFIC_METER_NAME ,
345- new OutboundTrafficMeter (nodeMetricUpdater , sessionMetricUpdater ));
346- }
330+ InFlightHandler inFlightHandler =
331+ new InFlightHandler (
332+ protocolVersion ,
333+ new StreamIdGenerator (maxRequestsPerConnection ),
334+ maxOrphanRequests ,
335+ setKeyspaceTimeoutMillis ,
336+ channel .newPromise (),
337+ options .eventCallback ,
338+ options .ownerLogPrefix );
339+ HeartbeatHandler heartbeatHandler = new HeartbeatHandler (defaultConfig );
340+ ProtocolInitHandler initHandler =
341+ new ProtocolInitHandler (
342+ context ,
343+ protocolVersion ,
344+ clusterName ,
345+ endPoint ,
346+ options ,
347+ heartbeatHandler ,
348+ productType == null );
349+
350+ ChannelPipeline pipeline = channel .pipeline ();
351+ context
352+ .getSslHandlerFactory ()
353+ .map (f -> f .newSslHandler (channel , endPoint ))
354+ .map (h -> pipeline .addLast (SSL_HANDLER_NAME , h ));
355+
356+ // Only add meter handlers on the pipeline if metrics are enabled.
357+ SessionMetricUpdater sessionMetricUpdater = context .getMetricsFactory ().getSessionUpdater ();
358+ if (nodeMetricUpdater .isEnabled (DefaultNodeMetric .BYTES_RECEIVED , null )
359+ || sessionMetricUpdater .isEnabled (DefaultSessionMetric .BYTES_RECEIVED , null )) {
360+ pipeline .addLast (
361+ INBOUND_TRAFFIC_METER_NAME ,
362+ new InboundTrafficMeter (nodeMetricUpdater , sessionMetricUpdater ));
363+ }
347364
348- pipeline
349- .addLast (
350- FRAME_TO_BYTES_ENCODER_NAME ,
351- new FrameEncoder (context .getFrameCodec (), maxFrameLength ))
352- .addLast (
353- BYTES_TO_FRAME_DECODER_NAME ,
354- new FrameDecoder (context .getFrameCodec (), maxFrameLength ))
355- // Note: HeartbeatHandler is inserted here once init completes
356- .addLast (INFLIGHT_HANDLER_NAME , inFlightHandler )
357- .addLast (INIT_HANDLER_NAME , initHandler );
358-
359- context .getNettyOptions ().afterChannelInitialized (channel );
360- } catch (Throwable t ) {
361- // If the init handler throws an exception, Netty swallows it and closes the channel. We
362- // want to propagate it instead, so fail the outer future (the result of connect()).
363- resultFuture .completeExceptionally (t );
364- throw t ;
365+ if (nodeMetricUpdater .isEnabled (DefaultNodeMetric .BYTES_SENT , null )
366+ || sessionMetricUpdater .isEnabled (DefaultSessionMetric .BYTES_SENT , null )) {
367+ pipeline .addLast (
368+ OUTBOUND_TRAFFIC_METER_NAME ,
369+ new OutboundTrafficMeter (nodeMetricUpdater , sessionMetricUpdater ));
365370 }
371+
372+ pipeline
373+ .addLast (
374+ FRAME_TO_BYTES_ENCODER_NAME ,
375+ new FrameEncoder (context .getFrameCodec (), maxFrameLength ))
376+ .addLast (
377+ BYTES_TO_FRAME_DECODER_NAME ,
378+ new FrameDecoder (context .getFrameCodec (), maxFrameLength ))
379+ // Note: HeartbeatHandler is inserted here once init completes
380+ .addLast (INFLIGHT_HANDLER_NAME , inFlightHandler )
381+ .addLast (INIT_HANDLER_NAME , initHandler );
382+
383+ context .getNettyOptions ().afterChannelInitialized (channel );
384+ } catch (Throwable t ) {
385+ // If the init handler throws an exception, Netty swallows it and closes the channel. We
386+ // want to propagate it instead, so fail the outer future (the result of connect()).
387+ resultFuture .completeExceptionally (t );
388+ throw t ;
366389 }
367- };
390+ }
368391 }
369392}
0 commit comments