3636import io .netty .channel .ChannelInboundHandlerAdapter ;
3737import io .netty .channel .ChannelOption ;
3838import io .netty .channel .ChannelPipeline ;
39+ import io .netty .handler .codec .haproxy .HAProxyMessage ;
3940import io .netty .handler .logging .LogLevel ;
4041import io .netty .handler .logging .LoggingHandler ;
4142import io .netty .handler .ssl .SniCompletionEvent ;
4243import io .netty .handler .ssl .SslHandler ;
4344import io .netty .handler .ssl .SslHandshakeCompletionEvent ;
4445
4546import io .kroxylicious .proxy .authentication .TransportSubjectBuilder ;
47+ import io .kroxylicious .proxy .bootstrap .FilterChainFactory ;
48+ import io .kroxylicious .proxy .config .NamedFilterDefinition ;
49+ import io .kroxylicious .proxy .config .PluginFactoryRegistry ;
4650import io .kroxylicious .proxy .filter .FilterAndInvoker ;
47- import io .kroxylicious .proxy .filter .NetFilter ;
4851import io .kroxylicious .proxy .frame .DecodedRequestFrame ;
4952import io .kroxylicious .proxy .frame .DecodedResponseFrame ;
5053import io .kroxylicious .proxy .frame .ResponseFrame ;
5659import io .kroxylicious .proxy .internal .codec .KafkaMessageListener ;
5760import io .kroxylicious .proxy .internal .codec .KafkaRequestEncoder ;
5861import io .kroxylicious .proxy .internal .codec .KafkaResponseDecoder ;
62+ import io .kroxylicious .proxy .internal .filter .ApiVersionsDowngradeFilter ;
63+ import io .kroxylicious .proxy .internal .filter .ApiVersionsIntersectFilter ;
64+ import io .kroxylicious .proxy .internal .filter .BrokerAddressFilter ;
65+ import io .kroxylicious .proxy .internal .filter .EagerMetadataLearner ;
66+ import io .kroxylicious .proxy .internal .filter .NettyFilterContext ;
5967import io .kroxylicious .proxy .internal .metrics .MetricEmittingKafkaMessageListener ;
6068import io .kroxylicious .proxy .internal .net .EndpointBinding ;
69+ import io .kroxylicious .proxy .internal .net .EndpointReconciler ;
6170import io .kroxylicious .proxy .internal .util .Metrics ;
6271import io .kroxylicious .proxy .model .VirtualClusterModel ;
6372import io .kroxylicious .proxy .service .HostPort ;
7180import static io .kroxylicious .proxy .internal .ProxyChannelState .SelectingServer ;
7281
7382public class KafkaProxyFrontendHandler
74- extends ChannelInboundHandlerAdapter
75- implements NetFilter .NetFilterContext {
83+ extends ChannelInboundHandlerAdapter {
7684
77- private static final String NET_FILTER_INVOKED_IN_WRONG_STATE = "NetFilterContext invoked in wrong session state" ;
7885 private static final Logger LOGGER = LoggerFactory .getLogger (KafkaProxyFrontendHandler .class );
7986
8087 /** Cache ApiVersions response which we use when returning ApiVersions ourselves */
@@ -84,10 +91,15 @@ public class KafkaProxyFrontendHandler
8491 private final boolean logFrames ;
8592 private final VirtualClusterModel virtualClusterModel ;
8693 private final EndpointBinding endpointBinding ;
87- private final NetFilter netFilter ;
94+ private final EndpointReconciler endpointReconciler ;
8895 private final DelegatingDecodePredicate dp ;
8996 private final ProxyChannelStateMachine proxyChannelStateMachine ;
9097 private final TransportSubjectBuilder subjectBuilder ;
98+ private final PluginFactoryRegistry pfr ;
99+ private final FilterChainFactory filterChainFactory ;
100+ private final List <NamedFilterDefinition > namedFilterDefinitions ;
101+ private final ApiVersionsIntersectFilter apiVersionsIntersectFilter ;
102+ private final ApiVersionsDowngradeFilter apiVersionsDowngradeFilter ;
91103
92104 private @ Nullable ChannelHandlerContext clientCtx ;
93105 @ VisibleForTesting
@@ -129,15 +141,24 @@ SSLSession sslSession() {
129141 }
130142
131143 KafkaProxyFrontendHandler (
132- NetFilter netFilter ,
144+ PluginFactoryRegistry pfr ,
145+ FilterChainFactory filterChainFactory ,
146+ List <NamedFilterDefinition > namedFilterDefinitions ,
147+ EndpointReconciler endpointReconciler ,
148+ ApiVersionsServiceImpl apiVersionsService ,
133149 DelegatingDecodePredicate dp ,
134150 TransportSubjectBuilder subjectBuilder ,
135151 EndpointBinding endpointBinding ,
136152 ProxyChannelStateMachine proxyChannelStateMachine ) {
137- this .netFilter = netFilter ;
153+ this .endpointBinding = endpointBinding ;
154+ this .pfr = pfr ;
155+ this .filterChainFactory = filterChainFactory ;
156+ this .namedFilterDefinitions = namedFilterDefinitions ;
157+ this .endpointReconciler = endpointReconciler ;
158+ this .apiVersionsIntersectFilter = new ApiVersionsIntersectFilter (apiVersionsService );
159+ this .apiVersionsDowngradeFilter = new ApiVersionsDowngradeFilter (apiVersionsService );
138160 this .dp = dp ;
139161 this .subjectBuilder = Objects .requireNonNull (subjectBuilder );
140- this .endpointBinding = endpointBinding ;
141162 this .virtualClusterModel = endpointBinding .endpointGateway ().virtualCluster ();
142163 this .proxyChannelStateMachine = proxyChannelStateMachine ;
143164 this .logNetwork = virtualClusterModel .isLogNetwork ();
@@ -305,12 +326,23 @@ void inApiVersions(DecodedRequestFrame<ApiVersionsRequestData> apiVersionsFrame)
305326 * Called by the {@link ProxyChannelStateMachine} on entry to the {@link SelectingServer} state.
306327 */
307328 void inSelectingServer () {
308- // Pass this as the filter context, so that
309- // filter.initiateConnect() call's back on
310- // our initiateConnect() method
311- this .netFilter .selectServer (this );
312- this .proxyChannelStateMachine
313- .assertIsConnecting ("NetFilter.selectServer() did not callback on NetFilterContext.initiateConnect(): filter='" + this .netFilter + "'" );
329+ List <FilterAndInvoker > apiVersionFilters = FilterAndInvoker .build ("ApiVersionsIntersect (internal)" , apiVersionsIntersectFilter );
330+ var filterAndInvokers = new ArrayList <>(apiVersionFilters );
331+ filterAndInvokers .addAll (FilterAndInvoker .build ("ApiVersionsDowngrade (internal)" , apiVersionsDowngradeFilter ));
332+
333+ NettyFilterContext filterContext = new NettyFilterContext (clientCtx ().channel ().eventLoop (), pfr );
334+ List <FilterAndInvoker > filterChain = filterChainFactory .createFilters (filterContext , this .namedFilterDefinitions );
335+ filterAndInvokers .addAll (filterChain );
336+
337+ if (endpointBinding .restrictUpstreamToMetadataDiscovery ()) {
338+ filterAndInvokers .addAll (FilterAndInvoker .build ("EagerMetadataLearner (internal)" , new EagerMetadataLearner ()));
339+ }
340+ List <FilterAndInvoker > brokerAddressFilters = FilterAndInvoker .build ("BrokerAddress (internal)" ,
341+ new BrokerAddressFilter (endpointBinding .endpointGateway (), endpointReconciler ));
342+ filterAndInvokers .addAll (brokerAddressFilters );
343+
344+ var target = Objects .requireNonNull (endpointBinding .upstreamTarget ());
345+ initiateConnect (target , filterAndInvokers );
314346 }
315347
316348 /**
@@ -350,133 +382,22 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
350382 proxyChannelStateMachine .onClientException (cause , endpointBinding .endpointGateway ().getDownstreamSslContext ().isPresent ());
351383 }
352384
353- /**
354- * Accessor exposing the client host to a {@link NetFilter}.
355- * <p>Called by the {@link #netFilter}.</p>
356- * @return The client host
357- * @throws IllegalStateException if {@link #proxyChannelStateMachine} is not {@link SelectingServer}.
358- */
359- @ Override
360- public String clientHost () {
361- final SelectingServer selectingServer = proxyChannelStateMachine
362- .enforceInSelectingServer (NET_FILTER_INVOKED_IN_WRONG_STATE );
363- if (selectingServer .haProxyMessage () != null ) {
364- return selectingServer .haProxyMessage ().sourceAddress ();
365- }
366- else {
367- return remoteHost ();
368- }
369- }
370-
371- /**
372- * Accessor exposing the client port to a {@link NetFilter}.
373- * <p>Called by the {@link #netFilter}.</p>
374- * @return The client port.
375- * @throws IllegalStateException if {@link #proxyChannelStateMachine} is not {@link SelectingServer}.
376- */
377- @ Override
378- public int clientPort () {
379- final SelectingServer selectingServer = proxyChannelStateMachine
380- .enforceInSelectingServer (NET_FILTER_INVOKED_IN_WRONG_STATE );
381- if (selectingServer .haProxyMessage () != null ) {
382- return selectingServer .haProxyMessage ().sourcePort ();
383- }
384- else {
385- return remotePort ();
386- }
387- }
388-
389- /**
390- * Accessor exposing the source address to a {@link NetFilter}.
391- * <p>Called by the {@link #netFilter}.</p>
392- * @return The source address.
393- * @throws IllegalStateException if {@link #proxyChannelStateMachine} is not {@link SelectingServer}.
394- */
395- @ Override
396- public SocketAddress srcAddress () {
397- proxyChannelStateMachine .enforceInSelectingServer (NET_FILTER_INVOKED_IN_WRONG_STATE );
398- return clientCtx ().channel ().remoteAddress ();
399- }
400-
401- /**
402- * Accessor exposing the local address to a {@link NetFilter}.
403- * <p>Called by the {@link #netFilter}.</p>
404- * @return The local address.
405- * @throws IllegalStateException if {@link #proxyChannelStateMachine} is not {@link SelectingServer}.
406- */
407- @ Override
408- public SocketAddress localAddress () {
409- proxyChannelStateMachine .enforceInSelectingServer (NET_FILTER_INVOKED_IN_WRONG_STATE );
410- return clientCtx ().channel ().localAddress ();
411- }
412-
413- /**
414- * Accessor exposing the authorizedId to a {@link NetFilter}.
415- * <p>Called by the {@link #netFilter}.</p>
416- * @return The authorized id, or null.
417- * @throws IllegalStateException if {@link #proxyChannelStateMachine} is not {@link SelectingServer}.
418- */
419- @ Override
420- public @ Nullable String authorizedId () {
421- proxyChannelStateMachine .enforceInSelectingServer (NET_FILTER_INVOKED_IN_WRONG_STATE );
422- return null ;
423- }
424-
425- /**
426- * Accessor exposing the name of the client library to a {@link NetFilter}.
427- * <p>Called by the {@link #netFilter}.</p>
428- * @return The name of the client library, or null.
429- * @throws IllegalStateException if {@link #proxyChannelStateMachine} is not {@link SelectingServer}.
430- */
431- @ Override
432- @ Nullable
433- public String clientSoftwareName () {
434- return proxyChannelStateMachine .enforceInSelectingServer (NET_FILTER_INVOKED_IN_WRONG_STATE ).clientSoftwareName ();
435- }
436-
437- /**
438- * Accessor exposing the version of the client library to a {@link NetFilter}.
439- * <p>Called by the {@link #netFilter}.</p>
440- * @return The version of the client library, or null.
441- * @throws IllegalStateException if {@link #proxyChannelStateMachine} is not {@link SelectingServer}.
442- */
443- @ Override
444- @ Nullable
445- public String clientSoftwareVersion () {
446- return proxyChannelStateMachine .enforceInSelectingServer (NET_FILTER_INVOKED_IN_WRONG_STATE ).clientSoftwareVersion ();
447- }
448-
449- /**
450- * Accessor exposing the SNI host name to a {@link NetFilter}.
451- * <p>Called by the {@link #netFilter}.</p>
452- * @return The SNI host name, or null.
453- * @throws IllegalStateException if {@link #proxyChannelStateMachine} is not {@link SelectingServer}.
454- */
455- @ Override
456- @ Nullable
457- public String sniHostname () {
458- proxyChannelStateMachine .enforceInSelectingServer (NET_FILTER_INVOKED_IN_WRONG_STATE );
459- return sniHostname ;
460- }
461-
462385 /**
463386 * Initiates the connection to a server.
464387 * Changes {@link #proxyChannelStateMachine} from {@link SelectingServer} to {@link Connecting}
465388 * Initializes the {@code backendHandler} and configures its pipeline
466389 * with the given {@code filters}.
467- * <p>Called by the {@link #netFilter}.</p>
468390 * @param remote upstream broker target
469391 * @param filters The protocol filters
470392 */
471- @ Override
472- public void initiateConnect (
473- HostPort remote ,
474- List <FilterAndInvoker > filters ) {
393+ void initiateConnect (
394+ HostPort remote ,
395+ List <FilterAndInvoker > filters ) {
475396 if (LOGGER .isDebugEnabled ()) {
476397 LOGGER .debug ("{}: Connecting to backend broker {} using filters {}" ,
477398 this .proxyChannelStateMachine .sessionId (), remote , filters );
478399 }
479- this .proxyChannelStateMachine .onNetFilterInitiateConnect (remote , filters , virtualClusterModel , netFilter );
400+ this .proxyChannelStateMachine .onInitiateConnect (remote , filters , virtualClusterModel );
480401 }
481402
482403 /**
@@ -589,7 +510,7 @@ void inForwarding() {
589510
590511 if (pendingReadComplete ) {
591512 pendingReadComplete = false ;
592- channelReadComplete (Objects . requireNonNull ( this .clientCtx ));
513+ channelReadComplete (this .clientCtx ( ));
593514 }
594515
595516 // once buffered message has been forwarded we enable auto-read to start accepting further messages
@@ -651,6 +572,9 @@ void flushToClient() {
651572 * @param msg the RPC to buffer.
652573 */
653574 void bufferMsg (Object msg ) {
575+ if (msg instanceof HAProxyMessage ) {
576+ return ;
577+ }
654578 if (bufferedMsgs == null ) {
655579 bufferedMsgs = new ArrayList <>();
656580 }
0 commit comments