Skip to content

Commit 9317209

Browse files
authored
fix(network): update metrics defore disconnection (#2080)
Signed-off-by: Ning Yu <[email protected]>
1 parent f1a050c commit 9317209

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

core/src/main/scala/kafka/network/SocketServer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,7 +1211,7 @@ private[kafka] class Processor(
12111211
if (response == null) {
12121212
throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`")
12131213
}
1214-
1214+
12151215
// Invoke send completion callback, and then update request metrics since there might be some
12161216
// request metrics got updated during callback
12171217
response.onComplete.foreach(onComplete => onComplete(send))
@@ -1264,6 +1264,9 @@ private[kafka] class Processor(
12641264
}.remoteHost
12651265
inflightResponses.entrySet().removeIf(e => {
12661266
val remove = connectionId.equals(e.getValue.request.context.connectionId)
1267+
if (remove) {
1268+
updateRequestMetrics(e.getValue)
1269+
}
12671270
remove
12681271
})
12691272
channelContexts.remove(connectionId)
@@ -1947,4 +1950,4 @@ class ChannelContext(val nextCorrelationId: util.Queue[Int], val responses: util
19471950
muteFlag == 0
19481951
}
19491952
}
1950-
// AutoMQ inject end
1953+
// AutoMQ inject end

0 commit comments

Comments
 (0)