Skip to content

Commit 5c09f3e

Browse files
committed
Remove dead SASL offload code from initializer and state machine
This code was the experimental foundation for having the proxy handle SASL termination and making the authorizedId available to the NetFilter at backend connection time. It was never enabled and we have since evolved our thinking around authentication and identity-oriented filters. We now envision Filters being responsible for SASL termination. Therefore we are removing the code. Signed-off-by: Robert Young <[email protected]>
1 parent 0c3225d commit 5c09f3e

11 files changed

+154
-421
lines changed

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/KafkaProxy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,9 @@ public KafkaProxy startup() {
200200
this.filterChainFactory = new FilterChainFactory(pfr, config.filterDefinitions());
201201

202202
var tlsServerBootstrap = buildServerBootstrap(proxyEventGroup,
203-
new KafkaProxyInitializer(filterChainFactory, pfr, true, endpointRegistry, endpointRegistry, false, Map.of(), apiVersionsService));
203+
new KafkaProxyInitializer(filterChainFactory, pfr, true, endpointRegistry, endpointRegistry, false, apiVersionsService));
204204
var plainServerBootstrap = buildServerBootstrap(proxyEventGroup,
205-
new KafkaProxyInitializer(filterChainFactory, pfr, false, endpointRegistry, endpointRegistry, false, Map.of(), apiVersionsService));
205+
new KafkaProxyInitializer(filterChainFactory, pfr, false, endpointRegistry, endpointRegistry, false, apiVersionsService));
206206

207207
bindingOperationProcessor.start(plainServerBootstrap, tlsServerBootstrap);
208208

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/SaslDecodePredicate.java renamed to kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/DelegatingDecodePredicate.java

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,38 @@
1313

1414
import edu.umd.cs.findbugs.annotations.Nullable;
1515

16-
class SaslDecodePredicate implements DecodePredicate {
16+
/**
17+
* DecodePredicate that:
18+
* <ol>
19+
* <li>Always decodes ApiVersions</li>
20+
* <li>Decodes all RPCs initially, until a delegate is installed</li>
21+
* <li>After the delegate is installed, use that to determine if non-ApiVersions RPCs should be decoded</li>
22+
* </ol>
23+
* The problem this class is solving is:
24+
* <ol>
25+
* <li>We want the proxy to avoid deserializing requests and responses "when it doesn't have to".
26+
* So when there isn't a filter which is interested in that request/response API, or API version
27+
* And when the proxy infra itself doesn't need to.</li>
28+
* <li>But it doesn't know, until it's got to the invoking the {@link io.kroxylicious.proxy.filter.NetFilter NetFilter}
29+
* impl and that having called back on the {@link io.kroxylicious.proxy.filter.NetFilter.NetFilterContext NetFilterContext},
30+
* what protocol filters are to be used.</li>
31+
* <li>But it's the {@link io.kroxylicious.proxy.internal.codec.KafkaRequestDecoder KafkaRequestDecoder}
32+
* which needs to know about decodability, and that sits in front of the {@link KafkaProxyFrontendHandler},
33+
* so there's a cyclic dependency.</li>
34+
* <li>It's easier to use this delegation pattern than it is to try to reconfigure
35+
* the predicate on the {@link io.kroxylicious.proxy.internal.codec.KafkaRequestDecoder KafkaRequestDecoder}.</li>
36+
* </ol>
37+
*/
38+
class DelegatingDecodePredicate implements DecodePredicate {
1739

18-
private static final Logger LOGGER = LoggerFactory.getLogger(SaslDecodePredicate.class);
40+
private static final Logger LOGGER = LoggerFactory.getLogger(DelegatingDecodePredicate.class);
1941

20-
private final boolean handleSasl;
2142
private @Nullable DecodePredicate delegate = null;
2243

23-
SaslDecodePredicate(boolean handleSasl) {
24-
this.handleSasl = handleSasl;
44+
DelegatingDecodePredicate() {
2545
}
2646

2747
public void setDelegate(DecodePredicate delegate) {
28-
/*
29-
* This delegate is ugly. The problem it is solving is:
30-
*
31-
* 1. We want the proxy to avoid deserializing requests and responses "when it doesn't have to"
32-
* * So when there isn't a filter which is interested in that request/response API, or API version
33-
* * And when the proxy infra itself doesn't need to
34-
* 2. With the SASL offload support the proxy itself (when configured) is interested in the SASL APIs.
35-
* 3. But it doesn't know, until it's got to the invoking the `NetFilter` impl and that having called
36-
* back on the `NetFilterContext`, what protocol filters are to be used.
37-
* 4. But it's the `KafkaDecodeFilter` which needs to know about decodability, and that sits in front
38-
* of the `KafkaProxyFrontendHandler`, so there's a cyclic dependency.
39-
* 5. It's easier to use this delegation pattern than it is to try to reconfigure
40-
* the predicate on the `KafkaDecodeFilter`.
41-
*/
4248
LOGGER.debug("Setting delegate {}", delegate);
4349
this.delegate = delegate;
4450
}
@@ -56,15 +62,7 @@ public boolean shouldDecodeRequest(ApiKeys apiKey, short apiVersion) {
5662
// to intercept it
5763
return true;
5864
}
59-
return isAuthenticationOffloaded(apiKey) || delegate.shouldDecodeRequest(apiKey, apiVersion);
60-
}
61-
62-
private boolean isAuthenticationOffloaded(ApiKeys apiKey) {
63-
return isAuthenticationOffloadEnabled() && (apiKey == ApiKeys.SASL_HANDSHAKE || apiKey == ApiKeys.SASL_AUTHENTICATE);
64-
}
65-
66-
public boolean isAuthenticationOffloadEnabled() {
67-
return handleSasl;
65+
return delegate.shouldDecodeRequest(apiKey, apiVersion);
6866
}
6967

7068
@Override
@@ -75,7 +73,6 @@ public boolean shouldDecodeResponse(ApiKeys apiKey, short apiVersion) {
7573
@Override
7674
public String toString() {
7775
return "SaslDecodePredicate(" +
78-
"handleSasl=" + handleSasl +
7976
", delegate=" + delegate +
8077
')';
8178
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/KafkaProxyFrontendHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public class KafkaProxyFrontendHandler
8585
private final VirtualClusterModel virtualClusterModel;
8686
private final EndpointBinding endpointBinding;
8787
private final NetFilter netFilter;
88-
private final SaslDecodePredicate dp;
88+
private final DelegatingDecodePredicate dp;
8989
private final ProxyChannelStateMachine proxyChannelStateMachine;
9090
private final TransportSubjectBuilder subjectBuilder;
9191

@@ -131,7 +131,7 @@ SSLSession sslSession() {
131131

132132
KafkaProxyFrontendHandler(
133133
NetFilter netFilter,
134-
SaslDecodePredicate dp,
134+
DelegatingDecodePredicate dp,
135135
TransportSubjectBuilder subjectBuilder,
136136
EndpointBinding endpointBinding,
137137
ProxyChannelStateMachine proxyChannelStateMachine) {
@@ -245,7 +245,7 @@ public void channelWritabilityChanged(
245245
public void channelRead(
246246
ChannelHandlerContext ctx,
247247
Object msg) {
248-
proxyChannelStateMachine.onClientRequest(dp, msg);
248+
proxyChannelStateMachine.onClientRequest(msg);
249249
}
250250

251251
/**

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/KafkaProxyInitializer.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77

88
import java.util.ArrayList;
99
import java.util.List;
10-
import java.util.Map;
1110

12-
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
1311
import org.slf4j.Logger;
1412
import org.slf4j.LoggerFactory;
1513

@@ -58,7 +56,6 @@ public class KafkaProxyInitializer extends ChannelInitializer<Channel> {
5856
static final String LOGGING_INBOUND_ERROR_HANDLER_NAME = "loggingInboundErrorHandler";
5957

6058
private final boolean haproxyProtocol;
61-
private final Map<KafkaAuthnHandler.SaslMechanism, AuthenticateCallbackHandler> authnHandlers;
6259
private final boolean tls;
6360
private final EndpointBindingResolver bindingResolver;
6461
private final EndpointReconciler endpointReconciler;
@@ -73,12 +70,10 @@ public KafkaProxyInitializer(FilterChainFactory filterChainFactory,
7370
EndpointBindingResolver bindingResolver,
7471
EndpointReconciler endpointReconciler,
7572
boolean haproxyProtocol,
76-
Map<KafkaAuthnHandler.SaslMechanism, AuthenticateCallbackHandler> authnMechanismHandlers,
7773
ApiVersionsServiceImpl apiVersionsService) {
7874
this.pfr = pfr;
7975
this.endpointReconciler = endpointReconciler;
8076
this.haproxyProtocol = haproxyProtocol;
81-
this.authnHandlers = authnMechanismHandlers != null ? authnMechanismHandlers : Map.of();
8277
this.tls = tls;
8378
this.bindingResolver = bindingResolver;
8479
this.filterChainFactory = filterChainFactory;
@@ -198,7 +193,7 @@ void addHandlers(Channel ch, EndpointBinding binding) {
198193
pipeline.addLast("HAProxyMessageDecoder", new HAProxyMessageDecoder());
199194
}
200195

201-
var dp = new SaslDecodePredicate(!authnHandlers.isEmpty());
196+
var dp = new DelegatingDecodePredicate();
202197
// The decoder, this only cares about the filters
203198
// because it needs to know whether to decode requests
204199

@@ -214,12 +209,7 @@ void addHandlers(Channel ch, EndpointBinding binding) {
214209
pipeline.addLast("frameLogger", new LoggingHandler("io.kroxylicious.proxy.internal.DownstreamFrameLogger", LogLevel.INFO));
215210
}
216211

217-
if (!authnHandlers.isEmpty()) {
218-
LOGGER.debug("Adding authn handler for handlers {}", authnHandlers);
219-
pipeline.addLast(new KafkaAuthnHandler(ch, authnHandlers));
220-
}
221-
222-
final NetFilter netFilter = new InitalizerNetFilter(dp,
212+
final NetFilter netFilter = new InitalizerNetFilter(
223213
ch,
224214
binding,
225215
pfr,
@@ -263,7 +253,6 @@ private static void addLoggingErrorHandler(ChannelPipeline pipeline) {
263253
@VisibleForTesting
264254
static class InitalizerNetFilter implements NetFilter {
265255

266-
private final SaslDecodePredicate decodePredicate;
267256
private final Channel ch;
268257
private final EndpointGateway gateway;
269258
private final EndpointBinding binding;
@@ -274,16 +263,14 @@ static class InitalizerNetFilter implements NetFilter {
274263
private final ApiVersionsIntersectFilter apiVersionsIntersectFilter;
275264
private final ApiVersionsDowngradeFilter apiVersionsDowngradeFilter;
276265

277-
InitalizerNetFilter(SaslDecodePredicate decodePredicate,
278-
Channel ch,
266+
InitalizerNetFilter(Channel ch,
279267
EndpointBinding binding,
280268
PluginFactoryRegistry pfr,
281269
FilterChainFactory filterChainFactory,
282270
List<NamedFilterDefinition> filterDefinitions,
283271
EndpointReconciler endpointReconciler,
284272
ApiVersionsIntersectFilter apiVersionsIntersectFilter,
285273
ApiVersionsDowngradeFilter apiVersionsDowngradeFilter) {
286-
this.decodePredicate = decodePredicate;
287274
this.ch = ch;
288275
this.gateway = binding.endpointGateway();
289276
this.binding = binding;
@@ -297,8 +284,7 @@ static class InitalizerNetFilter implements NetFilter {
297284

298285
@Override
299286
public void selectServer(NetFilter.NetFilterContext context) {
300-
List<FilterAndInvoker> apiVersionFilters = decodePredicate.isAuthenticationOffloadEnabled() ? List.of()
301-
: FilterAndInvoker.build("ApiVersionsIntersect (internal)", apiVersionsIntersectFilter);
287+
List<FilterAndInvoker> apiVersionFilters = FilterAndInvoker.build("ApiVersionsIntersect (internal)", apiVersionsIntersectFilter);
302288

303289
NettyFilterContext filterContext = new NettyFilterContext(ch.eventLoop(), pfr);
304290
List<FilterAndInvoker> filterChain = filterChainFactory.createFilters(filterContext, filterDefinitions);

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/ProxyChannelStateMachine.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -358,17 +358,15 @@ void clientReadComplete() {
358358

359359
/**
360360
* The proxy has received something from the client. The current state of the session determines what happens to it.
361-
* @param dp the decode predicate to be used if the session is still being negotiated
362361
* @param msg the RPC received from the downstream client
363362
*/
364363
void onClientRequest(
365-
SaslDecodePredicate dp,
366364
Object msg) {
367365
Objects.requireNonNull(frontendHandler);
368366
if (state() instanceof Forwarding) { // post-backend connection
369367
messageFromClient(msg);
370368
}
371-
else if (!onClientRequestBeforeForwarding(dp, msg)) {
369+
else if (!onClientRequestBeforeForwarding(msg)) {
372370
illegalState("Unexpected message received: " + (msg == null ? "null" : "message class=" + msg.getClass()));
373371
}
374372
}
@@ -515,20 +513,19 @@ private void toForwarding(Forwarding forwarding) {
515513

516514
/**
517515
* handle a message received from the client prior to connecting to the upstream node
518-
* @param dp DecodePredicate to cope with SASL offload
519516
* @param msg Message received from the downstream client.
520517
* @return <code>false</code> for unsupported message types
521518
*/
522-
private boolean onClientRequestBeforeForwarding(SaslDecodePredicate dp, Object msg) {
519+
private boolean onClientRequestBeforeForwarding(Object msg) {
523520
Objects.requireNonNull(frontendHandler).bufferMsg(msg);
524521
if (state() instanceof ProxyChannelState.ClientActive clientActive) {
525-
return onClientRequestInClientActiveState(dp, msg, clientActive);
522+
return onClientRequestInClientActiveState(msg, clientActive);
526523
}
527524
else if (state() instanceof ProxyChannelState.HaProxy haProxy) {
528-
return onClientRequestInHaProxyState(dp, msg, haProxy);
525+
return onClientRequestInHaProxyState(msg, haProxy);
529526
}
530527
else if (state() instanceof ProxyChannelState.ApiVersions apiVersions) {
531-
return onClientRequestInApiVersionsState(dp, msg, apiVersions);
528+
return onClientRequestInApiVersionsState(msg, apiVersions);
532529
}
533530
else if (state() instanceof ProxyChannelState.SelectingServer) {
534531
return msg instanceof RequestFrame;
@@ -540,35 +537,27 @@ else if (state() instanceof ProxyChannelState.SelectingServer) {
540537

541538
@SuppressWarnings({ "java:S1172", "java:S1135" })
542539
// We keep dp as we should need it and it gives consistency with the other onClientRequestIn methods (sue me)
543-
private boolean onClientRequestInApiVersionsState(SaslDecodePredicate dp, Object msg, ProxyChannelState.ApiVersions apiVersions) {
540+
private boolean onClientRequestInApiVersionsState(Object msg, ProxyChannelState.ApiVersions apiVersions) {
544541
if (msg instanceof RequestFrame) {
545-
// TODO if dp.isAuthenticationOffloadEnabled() then we need to forward to that handler
546-
// TODO we only do the connection once we know the authenticated identity
547542
toSelectingServer(apiVersions.toSelectingServer());
548543
return true;
549544
}
550545
return false;
551546
}
552547

553-
private boolean onClientRequestInHaProxyState(SaslDecodePredicate dp, Object msg, ProxyChannelState.HaProxy haProxy) {
554-
return transitionClientRequest(dp, msg, haProxy::toApiVersions, haProxy::toSelectingServer);
548+
private boolean onClientRequestInHaProxyState(Object msg, ProxyChannelState.HaProxy haProxy) {
549+
return transitionClientRequest(msg, haProxy::toApiVersions, haProxy::toSelectingServer);
555550
}
556551

557552
private boolean transitionClientRequest(
558-
SaslDecodePredicate dp,
559553
Object msg,
560554
Function<DecodedRequestFrame<ApiVersionsRequestData>, ProxyChannelState.ApiVersions> apiVersionsFactory,
561555
Function<DecodedRequestFrame<ApiVersionsRequestData>, ProxyChannelState.SelectingServer> selectingServerFactory) {
562556
if (isMessageApiVersionsRequest(msg)) {
563557
// We know it's an API Versions request even if the compiler doesn't
564558
@SuppressWarnings("unchecked")
565559
DecodedRequestFrame<ApiVersionsRequestData> apiVersionsFrame = (DecodedRequestFrame<ApiVersionsRequestData>) msg;
566-
if (dp.isAuthenticationOffloadEnabled()) {
567-
toApiVersions(apiVersionsFactory.apply(apiVersionsFrame), apiVersionsFrame);
568-
}
569-
else {
570-
toSelectingServer(selectingServerFactory.apply(apiVersionsFrame));
571-
}
560+
toSelectingServer(selectingServerFactory.apply(apiVersionsFrame));
572561
return true;
573562
}
574563
else if (msg instanceof RequestFrame) {
@@ -578,13 +567,13 @@ else if (msg instanceof RequestFrame) {
578567
return false;
579568
}
580569

581-
private boolean onClientRequestInClientActiveState(SaslDecodePredicate dp, Object msg, ProxyChannelState.ClientActive clientActive) {
570+
private boolean onClientRequestInClientActiveState(Object msg, ProxyChannelState.ClientActive clientActive) {
582571
if (msg instanceof HAProxyMessage haProxyMessage) {
583572
toHaProxy(clientActive.toHaProxy(haProxyMessage));
584573
return true;
585574
}
586575
else {
587-
return transitionClientRequest(dp, msg, clientActive::toApiVersions, clientActive::toSelectingServer);
576+
return transitionClientRequest(msg, clientActive::toApiVersions, clientActive::toSelectingServer);
588577
}
589578
}
590579

0 commit comments

Comments
 (0)