Skip to content

Commit 0e52c07

Browse files
authored
Merge pull request kroxylicious#2094 from callaertanthony/vc-tag-metrics
add virtualcluster tag to kroxylicious_payload_size_bytes metrics
2 parents 90ccdef + da6788d commit 0e52c07

File tree

5 files changed

+22
-16
lines changed

5 files changed

+22
-16
lines changed

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/KafkaProxyFrontendHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,8 @@ void inConnecting(
478478
pipeline.addFirst("frameLogger", new LoggingHandler("io.kroxylicious.proxy.internal.UpstreamFrameLogger"));
479479
}
480480
addFiltersToPipeline(filters, pipeline, inboundChannel);
481-
pipeline.addFirst("responseDecoder", new KafkaResponseDecoder(correlationManager, virtualClusterModel.socketFrameMaxSizeBytes()));
481+
pipeline.addFirst("responseDecoder", new KafkaResponseDecoder(correlationManager, virtualClusterModel.socketFrameMaxSizeBytes(),
482+
virtualClusterModel.getClusterName()));
482483
pipeline.addFirst("requestEncoder", new KafkaRequestEncoder(correlationManager));
483484
if (logNetwork) {
484485
pipeline.addFirst("networkLogger", new LoggingHandler("io.kroxylicious.proxy.internal.UpstreamNetworkLogger"));

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/codec/KafkaRequestDecoder.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,17 @@ public class KafkaRequestDecoder extends KafkaMessageDecoder {
3737
private final DecodePredicate decodePredicate;
3838

3939
private final ApiVersionsServiceImpl apiVersionsService;
40-
private final Counter inbodundMessageCounter;
40+
private final Counter inboundMessageCounter;
4141
private final Counter decodedMessagesCounter;
42+
private final String clusterName;
4243

4344
public KafkaRequestDecoder(DecodePredicate decodePredicate, int socketFrameMaxSize, ApiVersionsServiceImpl apiVersionsService, String clusterName) {
4445
super(socketFrameMaxSize);
4546
this.decodePredicate = decodePredicate;
4647
this.apiVersionsService = apiVersionsService;
48+
this.clusterName = clusterName;
4749
List<Tag> tags = Metrics.tags(Metrics.FLOWING_TAG, Metrics.DOWNSTREAM, Metrics.VIRTUAL_CLUSTER_TAG, clusterName);
48-
inbodundMessageCounter = Metrics.taggedCounter(Metrics.KROXYLICIOUS_INBOUND_DOWNSTREAM_MESSAGES, tags);
50+
inboundMessageCounter = Metrics.taggedCounter(Metrics.KROXYLICIOUS_INBOUND_DOWNSTREAM_MESSAGES, tags);
4951
decodedMessagesCounter = Metrics.taggedCounter(Metrics.KROXYLICIOUS_INBOUND_DOWNSTREAM_DECODED_MESSAGES, tags);
5052
}
5153

@@ -73,15 +75,15 @@ protected Frame decodeHeaderAndBody(ChannelHandlerContext ctx, ByteBuf in, final
7375
LOGGER.debug("{}: {} downstream correlation id: {}", ctx, apiKey, correlationId);
7476
RequestHeaderData header = null;
7577
final ByteBufAccessorImpl accessor;
76-
inbodundMessageCounter.increment();
78+
inboundMessageCounter.increment();
7779
var decodeRequest = decodePredicate.shouldDecodeRequest(apiKey, apiVersion);
7880
LOGGER.debug("Decode {}/v{} request? {}, Predicate {} ", apiKey, apiVersion, decodeRequest, decodePredicate);
7981
boolean decodeResponse = decodePredicate.shouldDecodeResponse(apiKey, apiVersion);
8082
LOGGER.debug("Decode {}/v{} response? {}, Predicate {}", apiKey, apiVersion, decodeResponse, decodePredicate);
8183
short headerVersion = apiKey.requestHeaderVersion(apiVersion);
8284
if (decodeRequest) {
8385
decodedMessagesCounter.increment();
84-
Metrics.payloadSizeBytesUpstreamSummary(apiKey, apiVersion).record(length);
86+
Metrics.payloadSizeBytesUpstreamSummary(apiKey, apiVersion, clusterName).record(length);
8587
if (log().isTraceEnabled()) { // avoid boxing
8688
log().trace("{}: headerVersion {}", ctx, headerVersion);
8789
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/codec/KafkaResponseDecoder.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ public class KafkaResponseDecoder extends KafkaMessageDecoder {
2828
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaResponseDecoder.class);
2929

3030
private final CorrelationManager correlationManager;
31+
private final String clusterName;
3132

32-
public KafkaResponseDecoder(CorrelationManager correlationManager, int socketRequestMaxSizeBytes) {
33+
public KafkaResponseDecoder(CorrelationManager correlationManager, int socketRequestMaxSizeBytes, String clusterName) {
3334
super(socketRequestMaxSizeBytes);
3435
this.correlationManager = correlationManager;
36+
this.clusterName = clusterName;
3537
}
3638

3739
@Override
@@ -70,7 +72,7 @@ else if (LOGGER.isDebugEnabled()) {
7072
ApiMessage body = BodyDecoder.decodeResponse(apiKey, apiVersion, accessor);
7173
log().trace("{}: Body: {}", ctx, body);
7274
Filter recipient = correlation.recipient();
73-
Metrics.payloadSizeBytesDownstreamSummary(apiKey, apiVersion).record(length);
75+
Metrics.payloadSizeBytesDownstreamSummary(apiKey, apiVersion, clusterName).record(length);
7476
if (recipient == null) {
7577
frame = new DecodedResponseFrame<>(apiVersion, correlationId, header, body);
7678
}
@@ -93,4 +95,4 @@ private ResponseHeaderData readHeader(short headerVersion, Readable accessor) {
9395
return new ResponseHeaderData(accessor, headerVersion);
9496
}
9597

96-
}
98+
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/util/Metrics.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,20 @@ public static Counter taggedCounter(String counterName, List<Tag> tags) {
5252
return counter(counterName, tags);
5353
}
5454

55-
public static DistributionSummary payloadSizeBytesUpstreamSummary(ApiKeys apiKey, short apiVersion) {
56-
return payloadSizeBytesSummary(apiKey, apiVersion, UPSTREAM);
55+
public static DistributionSummary payloadSizeBytesUpstreamSummary(ApiKeys apiKey, short apiVersion, String virtualCluster) {
56+
return payloadSizeBytesSummary(apiKey, apiVersion, UPSTREAM, virtualCluster);
5757
}
5858

59-
public static DistributionSummary payloadSizeBytesDownstreamSummary(ApiKeys apiKey, short apiVersion) {
60-
return payloadSizeBytesSummary(apiKey, apiVersion, DOWNSTREAM);
59+
public static DistributionSummary payloadSizeBytesDownstreamSummary(ApiKeys apiKey, short apiVersion, String virtualCluster) {
60+
return payloadSizeBytesSummary(apiKey, apiVersion, DOWNSTREAM, virtualCluster);
6161
}
6262

63-
private static DistributionSummary payloadSizeBytesSummary(ApiKeys apiKey, short apiVersion, String flowing) {
63+
private static DistributionSummary payloadSizeBytesSummary(ApiKeys apiKey, short apiVersion, String flowing, String virtualCluster) {
6464
List<Tag> tags = tags(
6565
"ApiKey", apiKey.name(),
6666
"ApiVersion", String.valueOf(apiVersion),
67-
FLOWING_TAG, flowing);
67+
FLOWING_TAG, flowing,
68+
VIRTUAL_CLUSTER_TAG, virtualCluster);
6869
return summary(KROXYLICIOUS_PAYLOAD_SIZE_BYTES, tags);
6970
}
7071

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/codec/ResponseDecoderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ void testApiVersionsExactlyOneFrame_opaque(short apiVersion) throws Exception {
7474

7575
@NonNull
7676
private static KafkaResponseDecoder createResponseDecoder(CorrelationManager mgr, int socketFrameMaxSizeBytes) {
77-
return new KafkaResponseDecoder(mgr, socketFrameMaxSizeBytes);
77+
return new KafkaResponseDecoder(mgr, socketFrameMaxSizeBytes, "vc");
7878
}
7979

8080
@Test
@@ -151,4 +151,4 @@ void supportsFallbackToApiResponseV0() {
151151
.extracting("body")
152152
.isEqualTo(new ApiVersionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
153153
}
154-
}
154+
}

0 commit comments

Comments
 (0)