-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Track streaming responses separately to fix SSE blocking flush #5330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds per-call Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/cio/NettyHttpResponsePipeline.kt`:
- Around line 183-187: The streaming detection currently uses a case-sensitive
contains check and increments httpHandlerState.streamingResponses without
guaranteeing a decrement on failures; update the Content-Type check to be
case-insensitive (e.g., compare a lower-cased header or use a case-insensitive
match) when setting call.isStreamingResponse, and ensure the streamingResponses
counter is decremented on all failure/exception paths—for example, decrement in
the respondWithFailure path or wrap the response handling around a try/finally
that decrements httpHandlerState.streamingResponses and clears
call.isStreamingResponse when an exception occurs; refer to
call.isStreamingResponse, httpHandlerState.streamingResponses.incrementAndGet(),
and respondWithFailure to locate where to change the logic.
🧹 Nitpick comments (1)
ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCall.kt (1)
36-37: Consider marking the new internal flag with@InternalAPI.This is an internal API surface on a public class; the guidelines call for
@InternalAPIon internal APIs to make the intent explicit. As per coding guidelines, ...💡 Proposed change
- internal var isStreamingResponse = false + `@InternalAPI` + internal var isStreamingResponse = false
HTTP/2 multiplexes multiple streams on a single connection. When an SSE endpoint is active, it keeps activeRequests > 0 forever, which prevents flushIfNeeded() from flushing responses for other requests on the same connection. This fix: - Tracks streaming responses (SSE) separately with a dedicated counter - Changes flush condition to allow flushing when only streaming requests remain - Properly decrements the streaming counter when SSE connections end
826ca65 to
7cce992
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/cio/NettyHttpResponsePipeline.kt`:
- Around line 183-192: The when-expression that computes contentType from
responseMessage (checking HttpResponse and Http2HeadersFrame via
HttpHeaders.ContentType) is left unclosed and the resulting contentType is
unused, breaking compilation and missing header sources; close the when block,
assign its result to contentType, then use contentType (falling back to
response.headers[HttpHeaders.ContentType] if needed) to detect
"text/event-stream" and, when matched, set call.isStreamingResponse = true and
call httpHandlerState.streamingResponses.incrementAndGet(); ensure you reference
responseMessage, HttpResponse, Http2HeadersFrame, HttpHeaders.ContentType,
call.isStreamingResponse, and
httpHandlerState.streamingResponses.incrementAndGet() in the corrected logic.
♻️ Duplicate comments (1)
ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/cio/NettyHttpResponsePipeline.kt (1)
183-192: Ensure streaming counter is decremented on failure paths.
If an exception is thrown after incrementing,streamingResponsesnever decrements, which can reintroduce flush blocking.🔧 Suggested change (failure path)
private fun respondWithFailure(call: NettyApplicationCall, actualException: Throwable) { val t = when { actualException is IOException && actualException !is ChannelIOException -> ChannelWriteException(exception = actualException) else -> actualException } + + if (call.isStreamingResponse) { + httpHandlerState.streamingResponses.decrementAndGet() + call.isStreamingResponse = false + }
ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/cio/NettyHttpResponsePipeline.kt
Show resolved
Hide resolved
…/NettyHttpResponsePipeline.kt Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
5e9b737 to
ce91764
Compare
HTTP/2 multiplexes multiple streams on a single connection. When an SSE endpoint is active, it keeps activeRequests > 0 forever, which prevents flushIfNeeded() from flushing responses for other requests on the same connection.
This fix: