Skip to content

Commit f3afd20

Browse files
authored
Merge pull request kroxylicious#2017 from SamBarker/network-metrics
Connection count metrics: This PR adds `kroxylicious_downstream_connections` - a count of the successful connections from clients `kroxylicious_downstream_errors` - a count of the exceptions triggered on the downstream/client side of the netty pipeline. `kroxylicious_upstream_connections` - a count of successful connections established by the proxy to an upstream broker. `kroxylicious_upstream_attempts` - a count of the number of times the proxy attempts to connect to an upstream broker. `kroxylicious_upstream_failures` - a count of the number of times the proxy fails in an attempt to connect to an upstream broker. `kroxylicious_upstream_errors` - a count of the exceptions triggered on the upstream/server side of the netty pipeline.
2 parents 5c1a672 + 0ae343f commit f3afd20

File tree

13 files changed

+454
-85
lines changed

13 files changed

+454
-85
lines changed

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/KafkaProxy.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.slf4j.Logger;
2020
import org.slf4j.LoggerFactory;
2121

22+
import io.micrometer.core.instrument.Tag;
2223
import io.netty.bootstrap.ServerBootstrap;
2324
import io.netty.channel.ChannelFutureListener;
2425
import io.netty.channel.ChannelOption;
@@ -153,9 +154,12 @@ public KafkaProxy startup() {
153154
.join();
154155

155156
// Pre-register counters/summaries to avoid creating them on first request and thus skewing the request latency
156-
// TODO add a virtual host tag to metrics
157-
Metrics.inboundDownstreamMessagesCounter();
158-
Metrics.inboundDownstreamDecodedMessagesCounter();
157+
virtualClusterModels.forEach(virtualClusterModel -> {
158+
List<Tag> tags = Metrics.tags(Metrics.FLOWING_TAG, Metrics.DOWNSTREAM, Metrics.VIRTUAL_CLUSTER_TAG, virtualClusterModel.getClusterName());
159+
Metrics.taggedCounter(Metrics.KROXYLICIOUS_INBOUND_DOWNSTREAM_MESSAGES, tags);
160+
Metrics.taggedCounter(Metrics.KROXYLICIOUS_INBOUND_DOWNSTREAM_DECODED_MESSAGES, tags);
161+
});
162+
159163
return this;
160164
}
161165
catch (RuntimeException e) {
@@ -306,4 +310,4 @@ public void close() throws Exception {
306310
}
307311
}
308312

309-
}
313+
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/KafkaProxyFrontendHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,9 @@ public class KafkaProxyFrontendHandler
105105
KafkaProxyFrontendHandler(
106106
@NonNull NetFilter netFilter,
107107
@NonNull SaslDecodePredicate dp,
108-
EndpointGateway endpointGateway) {
109-
this(netFilter, dp, endpointGateway, new ProxyChannelStateMachine());
108+
EndpointGateway endpointGateway,
109+
@NonNull String clusterName) {
110+
this(netFilter, dp, endpointGateway, new ProxyChannelStateMachine(clusterName));
110111
}
111112

112113
@VisibleForTesting
@@ -660,4 +661,4 @@ private ResponseFrame errorResponse(
660661
}
661662
return errorResponse;
662663
}
663-
}
664+
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/KafkaProxyInitializer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ void addHandlers(SocketChannel ch, EndpointBinding binding) {
192192
var dp = new SaslDecodePredicate(!authnHandlers.isEmpty());
193193
// The decoder, this only cares about the filters
194194
// because it needs to know whether to decode requests
195-
KafkaRequestDecoder decoder = new KafkaRequestDecoder(dp, virtualCluster.socketFrameMaxSizeBytes(), apiVersionsService);
195+
KafkaRequestDecoder decoder = new KafkaRequestDecoder(dp, virtualCluster.socketFrameMaxSizeBytes(), apiVersionsService, virtualCluster.getClusterName());
196196
pipeline.addLast("requestDecoder", decoder);
197197
pipeline.addLast("responseEncoder", new KafkaResponseEncoder());
198198
pipeline.addLast("responseOrderer", new ResponseOrderer());
@@ -214,7 +214,7 @@ void addHandlers(SocketChannel ch, EndpointBinding binding) {
214214
endpointReconciler,
215215
new ApiVersionsIntersectFilter(apiVersionsService),
216216
new ApiVersionsDowngradeFilter(apiVersionsService));
217-
var frontendHandler = new KafkaProxyFrontendHandler(netFilter, dp, binding.endpointGateway());
217+
var frontendHandler = new KafkaProxyFrontendHandler(netFilter, dp, binding.endpointGateway(), virtualCluster.getClusterName());
218218

219219
pipeline.addLast("netHandler", frontendHandler);
220220
addLoggingErrorHandler(pipeline);
@@ -298,4 +298,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
298298
cause.getClass().getSimpleName(), cause.getMessage());
299299
}
300300
}
301-
}
301+
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/MeterRegistries.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.kroxylicious.proxy.micrometer.MicrometerConfigurationHookService;
2828
import io.kroxylicious.proxy.tag.VisibleForTesting;
2929

30+
import edu.umd.cs.findbugs.annotations.NonNull;
31+
3032
public class MeterRegistries implements AutoCloseable {
3133
private final PrometheusMeterRegistry prometheusMeterRegistry;
3234

@@ -44,6 +46,7 @@ public MeterRegistries(PluginFactoryRegistry pfr, List<MicrometerDefinition> mic
4446
private List<MicrometerConfigurationHook> registerHooks(List<MicrometerDefinition> micrometerConfig) {
4547
CompositeMeterRegistry globalRegistry = Metrics.globalRegistry;
4648
preventDifferentTagNameRegistration(globalRegistry);
49+
@SuppressWarnings("unchecked")
4750
var configurationHooks = micrometerConfig.stream()
4851
.map(f -> pfr.pluginFactory(MicrometerConfigurationHookService.class).pluginInstance(f.type()).build(f.config()))
4952
.toList();
@@ -62,8 +65,9 @@ private List<MicrometerConfigurationHook> registerHooks(List<MicrometerDefinitio
6265
@VisibleForTesting
6366
static void preventDifferentTagNameRegistration(CompositeMeterRegistry registry) {
6467
registry.config().meterFilter(new MeterFilter() {
68+
@NonNull
6569
@Override
66-
public MeterFilterReply accept(Meter.Id id) {
70+
public MeterFilterReply accept(@NonNull Meter.Id id) {
6771
boolean allTagsSame = registry.find(id.getName()).meters().stream().allMatch(meter -> tagNames(meter.getId()).equals(tagNames(id)));
6872
if (!allTagsSame) {
6973
logger.error("Attempted to register a meter with id {} which is already registered but with a different set of tag names", id);
@@ -96,4 +100,4 @@ public void close() {
96100
copy.forEach(Metrics.globalRegistry::remove);
97101
Metrics.removeRegistry(prometheusMeterRegistry);
98102
}
99-
}
103+
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/ProxyChannelStateMachine.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.apache.kafka.common.protocol.Errors;
1717
import org.slf4j.Logger;
1818

19+
import io.micrometer.core.instrument.Counter;
20+
import io.micrometer.core.instrument.Tag;
1921
import io.netty.channel.ChannelHandlerContext;
2022
import io.netty.handler.codec.DecoderException;
2123
import io.netty.handler.codec.haproxy.HAProxyMessage;
@@ -27,6 +29,7 @@
2729
import io.kroxylicious.proxy.internal.ProxyChannelState.Closed;
2830
import io.kroxylicious.proxy.internal.ProxyChannelState.Forwarding;
2931
import io.kroxylicious.proxy.internal.codec.FrameOversizedException;
32+
import io.kroxylicious.proxy.internal.util.Metrics;
3033
import io.kroxylicious.proxy.model.VirtualClusterModel;
3134
import io.kroxylicious.proxy.service.HostPort;
3235
import io.kroxylicious.proxy.tag.VisibleForTesting;
@@ -35,6 +38,13 @@
3538
import edu.umd.cs.findbugs.annotations.Nullable;
3639

3740
import static io.kroxylicious.proxy.internal.ProxyChannelState.Startup.STARTING_STATE;
41+
import static io.kroxylicious.proxy.internal.util.Metrics.KROXYLICIOUS_DOWNSTREAM_CONNECTIONS;
42+
import static io.kroxylicious.proxy.internal.util.Metrics.KROXYLICIOUS_DOWNSTREAM_ERRORS;
43+
import static io.kroxylicious.proxy.internal.util.Metrics.KROXYLICIOUS_UPSTREAM_CONNECTIONS;
44+
import static io.kroxylicious.proxy.internal.util.Metrics.KROXYLICIOUS_UPSTREAM_CONNECTION_ATTEMPTS;
45+
import static io.kroxylicious.proxy.internal.util.Metrics.KROXYLICIOUS_UPSTREAM_CONNECTION_FAILURES;
46+
import static io.kroxylicious.proxy.internal.util.Metrics.KROXYLICIOUS_UPSTREAM_ERRORS;
47+
import static io.kroxylicious.proxy.internal.util.Metrics.taggedCounter;
3848
import static org.slf4j.LoggerFactory.getLogger;
3949

4050
/**
@@ -95,6 +105,22 @@
95105
public class ProxyChannelStateMachine {
96106
private static final String DUPLICATE_INITIATE_CONNECT_ERROR = "NetFilter called NetFilterContext.initiateConnect() more than once";
97107
private static final Logger LOGGER = getLogger(ProxyChannelStateMachine.class);
108+
private final Counter downstreamConnectionsCounter;
109+
private final Counter upstreamConnectionsCounter;
110+
private final Counter downstreamErrorCounter;
111+
private final Counter upstreamErrorCounter;
112+
private final Counter connectionAttemptsCounter;
113+
private final Counter upstreamConnectionFailureCounter;
114+
115+
public ProxyChannelStateMachine(String clusterName) {
116+
List<Tag> tags = Metrics.tags(Metrics.VIRTUAL_CLUSTER_TAG, clusterName);
117+
downstreamConnectionsCounter = taggedCounter(KROXYLICIOUS_DOWNSTREAM_CONNECTIONS, tags);
118+
downstreamErrorCounter = taggedCounter(KROXYLICIOUS_DOWNSTREAM_ERRORS, tags);
119+
upstreamConnectionsCounter = taggedCounter(KROXYLICIOUS_UPSTREAM_CONNECTIONS, tags);
120+
connectionAttemptsCounter = taggedCounter(KROXYLICIOUS_UPSTREAM_CONNECTION_ATTEMPTS, tags);
121+
upstreamErrorCounter = taggedCounter(KROXYLICIOUS_UPSTREAM_ERRORS, tags);
122+
upstreamConnectionFailureCounter = taggedCounter(KROXYLICIOUS_UPSTREAM_CONNECTION_FAILURES, tags);
123+
}
98124

99125
/**
100126
* The current state. This can be changed via a call to one of the {@code on*()} methods.
@@ -370,6 +396,10 @@ void onServerException(Throwable cause) {
370396
.setCause(LOGGER.isDebugEnabled() ? cause : null)
371397
.addArgument(cause != null ? cause.getMessage() : "")
372398
.log("Exception from the server channel: {}. Increase log level to DEBUG for stacktrace");
399+
if (state instanceof ProxyChannelState.Connecting) {
400+
upstreamConnectionFailureCounter.increment();
401+
}
402+
upstreamErrorCounter.increment();
373403
toClosed(cause);
374404
}
375405

@@ -395,6 +425,7 @@ void onClientException(Throwable cause, boolean tlsEnabled) {
395425
.log("Exception from the client channel: {}. Increase log level to DEBUG for stacktrace");
396426
errorCodeEx = Errors.UNKNOWN_SERVER_ERROR.exception();
397427
}
428+
downstreamErrorCounter.increment();
398429
toClosed(errorCodeEx);
399430
}
400431

@@ -403,6 +434,7 @@ private void toClientActive(
403434
@NonNull KafkaProxyFrontendHandler frontendHandler) {
404435
setState(clientActive);
405436
frontendHandler.inClientActive();
437+
downstreamConnectionsCounter.increment();
406438
}
407439

408440
private void toConnecting(
@@ -412,11 +444,13 @@ private void toConnecting(
412444
setState(connecting);
413445
backendHandler = new KafkaProxyBackendHandler(this, virtualClusterModel);
414446
frontendHandler.inConnecting(connecting.remote(), filters, backendHandler);
447+
connectionAttemptsCounter.increment();
415448
}
416449

417450
private void toForwarding(Forwarding forwarding) {
418451
setState(forwarding);
419452
Objects.requireNonNull(frontendHandler).inForwarding();
453+
upstreamConnectionsCounter.increment();
420454
}
421455

422456
/**
@@ -510,6 +544,7 @@ private void toSelectingServer(ProxyChannelState.SelectingServer selectingServer
510544
Objects.requireNonNull(frontendHandler).inSelectingServer();
511545
}
512546

547+
@SuppressWarnings("ConstantValue")
513548
private void toClosed(@Nullable Throwable errorCodeEx) {
514549
if (state instanceof Closed) {
515550
return;
@@ -521,7 +556,9 @@ private void toClosed(@Nullable Throwable errorCodeEx) {
521556
}
522557

523558
// Close the client connection with any error code
524-
Objects.requireNonNull(frontendHandler).inClosed(errorCodeEx);
559+
if (frontendHandler != null) { // Can be null if the error happens before clientActive (unlikely but possible)
560+
frontendHandler.inClosed(errorCodeEx);
561+
}
525562
}
526563

527564
private void setState(@NonNull ProxyChannelState state) {
@@ -533,4 +570,4 @@ private static boolean isMessageApiVersionsRequest(Object msg) {
533570
return msg instanceof DecodedRequestFrame
534571
&& ((DecodedRequestFrame<?>) msg).apiKey() == ApiKeys.API_VERSIONS;
535572
}
536-
}
573+
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/codec/KafkaRequestDecoder.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package io.kroxylicious.proxy.internal.codec;
77

8+
import java.util.List;
9+
810
import org.apache.kafka.common.message.ApiVersionsRequestData;
911
import org.apache.kafka.common.message.RequestHeaderData;
1012
import org.apache.kafka.common.protocol.ApiKeys;
@@ -13,6 +15,8 @@
1315
import org.slf4j.Logger;
1416
import org.slf4j.LoggerFactory;
1517

18+
import io.micrometer.core.instrument.Counter;
19+
import io.micrometer.core.instrument.Tag;
1620
import io.netty.buffer.ByteBuf;
1721
import io.netty.channel.ChannelHandlerContext;
1822

@@ -33,11 +37,16 @@ public class KafkaRequestDecoder extends KafkaMessageDecoder {
3337
private final DecodePredicate decodePredicate;
3438

3539
private final ApiVersionsServiceImpl apiVersionsService;
40+
private final Counter inbodundMessageCounter;
41+
private final Counter decodedMessagesCounter;
3642

37-
public KafkaRequestDecoder(DecodePredicate decodePredicate, int socketFrameMaxSize, ApiVersionsServiceImpl apiVersionsService) {
43+
public KafkaRequestDecoder(DecodePredicate decodePredicate, int socketFrameMaxSize, ApiVersionsServiceImpl apiVersionsService, String clusterName) {
3844
super(socketFrameMaxSize);
3945
this.decodePredicate = decodePredicate;
4046
this.apiVersionsService = apiVersionsService;
47+
List<Tag> tags = Metrics.tags(Metrics.FLOWING_TAG, Metrics.DOWNSTREAM, Metrics.VIRTUAL_CLUSTER_TAG, clusterName);
48+
inbodundMessageCounter = Metrics.taggedCounter(Metrics.KROXYLICIOUS_INBOUND_DOWNSTREAM_MESSAGES, tags);
49+
decodedMessagesCounter = Metrics.taggedCounter(Metrics.KROXYLICIOUS_INBOUND_DOWNSTREAM_DECODED_MESSAGES, tags);
4150
}
4251

4352
@Override
@@ -64,14 +73,14 @@ protected Frame decodeHeaderAndBody(ChannelHandlerContext ctx, ByteBuf in, final
6473
LOGGER.debug("{}: {} downstream correlation id: {}", ctx, apiKey, correlationId);
6574
RequestHeaderData header = null;
6675
final ByteBufAccessorImpl accessor;
67-
Metrics.inboundDownstreamMessagesCounter().increment();
76+
inbodundMessageCounter.increment();
6877
var decodeRequest = decodePredicate.shouldDecodeRequest(apiKey, apiVersion);
6978
LOGGER.debug("Decode {}/v{} request? {}, Predicate {} ", apiKey, apiVersion, decodeRequest, decodePredicate);
7079
boolean decodeResponse = decodePredicate.shouldDecodeResponse(apiKey, apiVersion);
7180
LOGGER.debug("Decode {}/v{} response? {}, Predicate {}", apiKey, apiVersion, decodeResponse, decodePredicate);
7281
short headerVersion = apiKey.requestHeaderVersion(apiVersion);
7382
if (decodeRequest) {
74-
Metrics.inboundDownstreamDecodedMessagesCounter().increment();
83+
decodedMessagesCounter.increment();
7584
Metrics.payloadSizeBytesUpstreamSummary(apiKey, apiVersion).record(length);
7685
if (log().isTraceEnabled()) { // avoid boxing
7786
log().trace("{}: headerVersion {}", ctx, headerVersion);
@@ -198,4 +207,4 @@ private RequestHeaderData readHeader(short headerVersion, Readable accessor) {
198207
return new RequestHeaderData(accessor, headerVersion);
199208
}
200209

201-
}
210+
}

0 commit comments

Comments
 (0)