feat(functions): add proper streaming response support to HTTPClient#903
feat(functions): add proper streaming response support to HTTPClient#903
Conversation
This change integrates streaming responses into the HTTPClient infrastructure, allowing streaming requests to go through the interceptor chain for auth headers, logging, and other middleware. Changes: - Add sendStreaming method to HTTPClientType protocol - Add HTTPResponse.Stream type for streaming response metadata + body - Add interceptRequest and onStreamingResponseComplete to HTTPClientInterceptor - Implement StreamingResponseDelegate for URLSession delegate handling - Update LoggerInterceptor to support streaming request logging - Migrate _invokeWithStreamedResponse to use HTTPClient.sendStreaming - Remove duplicate StreamResponseDelegate from FunctionsClient This fixes the issue where streaming requests bypassed the HTTPClient abstraction and missed auth token synchronization, logging, and other interceptor functionality. Linear: SDK-663 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR introduces streaming HTTP response support to the HTTP client infrastructure. A new Changes
Sequence DiagramsequenceDiagram
participant Client
participant HTTPClient
participant Interceptor
participant StreamingDelegate
participant URLSession
Client->>HTTPClient: sendStreaming(request)
HTTPClient->>Interceptor: interceptRequest(request)
Interceptor->>Interceptor: log request
Interceptor-->>HTTPClient: modified request
HTTPClient->>URLSession: dataTask(request)
URLSession->>StreamingDelegate: didReceiveResponse(response)
StreamingDelegate->>StreamingDelegate: create AsyncThrowingStream
StreamingDelegate-->>HTTPClient: HTTPResponse.Stream
HTTPClient-->>Client: returns Stream
URLSession->>StreamingDelegate: didReceiveData(data)
StreamingDelegate->>Client: yield data chunk
URLSession->>StreamingDelegate: didCompleteWithError(error)
StreamingDelegate->>Interceptor: onStreamingResponseComplete(error)
Interceptor->>Interceptor: log completion
StreamingDelegate->>Client: stream finish
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@Sources/Helpers/HTTP/HTTPClient.swift`:
- Around line 170-230: Both continuation.onTermination and
urlSession(_:task:didCompleteWithError:) currently call interceptors causing
duplicate completion callbacks; change the logic so only
urlSession(_:task:didCompleteWithError:) notifies interceptors via
onStreamingResponseComplete(capturedRequest, error:), and make
continuation.onTermination handle only cancellation (no interceptor calls).
Specifically, remove the interceptor loop in the continuation.onTermination
closure (the capturedInterceptors loop) and ensure
urlSession(_:task:didCompleteWithError:) uses the same
capturedRequest/interceptors set and the hasResumedResponseContinuation guard so
interceptors are invoked exactly once with the correct error (or nil on
success).
- Around line 55-84: The sendStreaming function currently leaves the URLSession
data task running if the awaiting task is cancelled before the initial response
arrives; add cancellation bridging so the URLSession task is cancelled and the
response continuation is resumed with a cancellation error when the awaiting
Task is cancelled. Concretely: inside the withCheckedThrowingContinuation block
in sendStreaming, after creating the URLSession/dataTask and calling
delegate.setTask(task) (and before task.resume()), register a cancellation
handler (e.g. via withTaskCancellationHandler or by observing
Task.isCancelled/Task.cancelled and resuming the continuation) that calls
task.cancel() and resumes the continuation with a CancellationError if it hasn’t
been resumed yet; keep using the existing StreamingResponseDelegate and its
continuation handling to avoid double-resume. Ensure you reference the local
task, continuation, and delegate to implement the bridge.
| package func sendStreaming(_ request: HTTPRequest) async throws -> HTTPResponse.Stream { | ||
| // Apply request-phase interceptors (modify headers, log request start, etc.) | ||
| var modifiedRequest = request | ||
| for interceptor in interceptors { | ||
| modifiedRequest = try await interceptor.interceptRequest(modifiedRequest) | ||
| } | ||
|
|
||
| let urlRequest = modifiedRequest.urlRequest | ||
| let capturedInterceptors = interceptors | ||
| let capturedSessionConfiguration = sessionConfiguration | ||
|
|
||
| return try await withCheckedThrowingContinuation { | ||
| (continuation: CheckedContinuation<HTTPResponse.Stream, any Error>) in | ||
| let delegate = StreamingResponseDelegate( | ||
| request: modifiedRequest, | ||
| interceptors: capturedInterceptors, | ||
| continuation: continuation | ||
| ) | ||
|
|
||
| let session = URLSession( | ||
| configuration: capturedSessionConfiguration, | ||
| delegate: delegate, | ||
| delegateQueue: nil | ||
| ) | ||
|
|
||
| let task = session.dataTask(with: urlRequest) | ||
| delegate.setTask(task) | ||
| task.resume() | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's explore the repository structure and locate the file
find . -name "HTTPClient.swift" -type fRepository: supabase/supabase-swift
Length of output: 106
🏁 Script executed:
# Get the content of the HTTPClient.swift file, specifically around lines 55-84
cat -n Sources/Helpers/HTTP/HTTPClient.swift | sed -n '45,100p'Repository: supabase/supabase-swift
Length of output: 2410
🏁 Script executed:
# Search for StreamingResponseDelegate to understand how it handles cancellation
rg "class StreamingResponseDelegate" -A 30Repository: supabase/supabase-swift
Length of output: 2229
🏁 Script executed:
# Search for any existing cancellation handling in the file
rg "withTaskCancellationHandler|onCancel|cancel\(\)" Sources/Helpers/HTTP/HTTPClient.swiftRepository: supabase/supabase-swift
Length of output: 87
🏁 Script executed:
# Check if there's any task reference or cancellation mechanism in URLSession handling
rg "task\?" Sources/Helpers/HTTP/ -A 3 -B 3Repository: supabase/supabase-swift
Length of output: 485
🏁 Script executed:
# Get the full StreamingResponseDelegate class implementation
rg "final class StreamingResponseDelegate" -A 150 Sources/Helpers/HTTP/HTTPClient.swift | head -200Repository: supabase/supabase-swift
Length of output: 3498
🏁 Script executed:
# Look for deinit or any cleanup in the StreamingResponseDelegate
rg "deinit|onCancel" Sources/Helpers/HTTP/HTTPClient.swift -B 2 -A 5Repository: supabase/supabase-swift
Length of output: 49
🏁 Script executed:
# Check if there are any tests for cancellation handling in this code
rg "sendStreaming" -B 3 -A 10 tests/ --type swiftRepository: supabase/supabase-swift
Length of output: 113
🏁 Script executed:
# Verify the proposed fix syntax is correct for Swift concurrency
rg "withTaskCancellationHandler" . --type swiftRepository: supabase/supabase-swift
Length of output: 361
🏁 Script executed:
# Check the existing usage of withTaskCancellationHandler to see the pattern
rg "withTaskCancellationHandler" -A 5 Sources/Helpers/URLSession+AsyncAwait.swift | head -50Repository: supabase/supabase-swift
Length of output: 888
🏁 Script executed:
# Let's verify the proposed fix logic once more by checking if there are any similar patterns
rg "var task.*?\n.*withTaskCancellationHandler" -A 10 Sources/ --type swiftRepository: supabase/supabase-swift
Length of output: 258
🏁 Script executed:
# Search for the pattern more carefully
rg -U "var task.*\n.*withTaskCancellationHandler" -A 10 Sources/ --type swiftRepository: supabase/supabase-swift
Length of output: 49
🏁 Script executed:
# Also check the full context in URLSession+AsyncAwait.swift to understand the pattern
cat -n Sources/Helpers/URLSession+AsyncAwait.swift | sed -n '1,100p'Repository: supabase/supabase-swift
Length of output: 3994
🏁 Script executed:
# Get the onCancel handler implementation
cat -n Sources/Helpers/URLSession+AsyncAwait.swift | sed -n '100,110p'Repository: supabase/supabase-swift
Length of output: 455
🏁 Script executed:
# Check if there's any documentation or comments about why this pattern is used
cat -n Sources/Helpers/URLSession+AsyncAwait.swift | sed -n '12,20p'Repository: supabase/supabase-swift
Length of output: 761
Add cancellation bridging for the URLSession task during initial response wait.
If the awaiting task is cancelled before the initial streaming response arrives, the URLSession task will continue running since the responseContinuation has no cancellation handler. Only the data stream continuation (set after response) has termination handling. A task cancellation handler bridges the gap.
🛠️ Add cancellation bridging for the URLSession task
package func sendStreaming(_ request: HTTPRequest) async throws -> HTTPResponse.Stream {
// Apply request-phase interceptors (modify headers, log request start, etc.)
var modifiedRequest = request
for interceptor in interceptors {
modifiedRequest = try await interceptor.interceptRequest(modifiedRequest)
}
let urlRequest = modifiedRequest.urlRequest
let capturedInterceptors = interceptors
let capturedSessionConfiguration = sessionConfiguration
- return try await withCheckedThrowingContinuation {
- (continuation: CheckedContinuation<HTTPResponse.Stream, any Error>) in
+ var task: URLSessionTask?
+ return try await withTaskCancellationHandler {
+ try await withCheckedThrowingContinuation {
+ (continuation: CheckedContinuation<HTTPResponse.Stream, any Error>) in
let delegate = StreamingResponseDelegate(
request: modifiedRequest,
interceptors: capturedInterceptors,
continuation: continuation
)
let session = URLSession(
configuration: capturedSessionConfiguration,
delegate: delegate,
delegateQueue: nil
)
- let task = session.dataTask(with: urlRequest)
- delegate.setTask(task)
- task.resume()
- }
+ let localTask = session.dataTask(with: urlRequest)
+ task = localTask
+ delegate.setTask(localTask)
+ localTask.resume()
+ }
+ } onCancel: {
+ task?.cancel()
}
}🤖 Prompt for AI Agents
In `@Sources/Helpers/HTTP/HTTPClient.swift` around lines 55 - 84, The
sendStreaming function currently leaves the URLSession data task running if the
awaiting task is cancelled before the initial response arrives; add cancellation
bridging so the URLSession task is cancelled and the response continuation is
resumed with a cancellation error when the awaiting Task is cancelled.
Concretely: inside the withCheckedThrowingContinuation block in sendStreaming,
after creating the URLSession/dataTask and calling delegate.setTask(task) (and
before task.resume()), register a cancellation handler (e.g. via
withTaskCancellationHandler or by observing Task.isCancelled/Task.cancelled and
resuming the continuation) that calls task.cancel() and resumes the continuation
with a CancellationError if it hasn’t been resumed yet; keep using the existing
StreamingResponseDelegate and its continuation handling to avoid double-resume.
Ensure you reference the local task, continuation, and delegate to implement the
bridge.
| continuation.onTermination = { [weak self] _ in | ||
| guard let self else { return } | ||
| self.lock.lock() | ||
| let task = self.task | ||
| self.lock.unlock() | ||
| task?.cancel() | ||
|
|
||
| // Notify interceptors of completion | ||
| Task { | ||
| for interceptor in capturedInterceptors { | ||
| await interceptor.onStreamingResponseComplete(capturedRequest, error: nil) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let streamResponse = HTTPResponse.Stream( | ||
| statusCode: httpResponse.statusCode, | ||
| headers: HTTPFields(httpResponse.allHeaderFields as? [String: String] ?? [:]), | ||
| underlyingResponse: httpResponse, | ||
| body: stream | ||
| ) | ||
|
|
||
| responseContinuation.resume(returning: streamResponse) | ||
| completionHandler(.allow) | ||
| } | ||
|
|
||
| func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { | ||
| lock.lock() | ||
| let continuation = dataContinuation | ||
| lock.unlock() | ||
| continuation?.yield(data) | ||
| } | ||
|
|
||
| func urlSession( | ||
| _ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)? | ||
| ) { | ||
| lock.lock() | ||
| let continuation = dataContinuation | ||
| let alreadyResumed = hasResumedResponseContinuation | ||
| lock.unlock() | ||
|
|
||
| if let error { | ||
| if !alreadyResumed { | ||
| lock.lock() | ||
| hasResumedResponseContinuation = true | ||
| lock.unlock() | ||
| responseContinuation.resume(throwing: error) | ||
| } else { | ||
| continuation?.finish(throwing: error) | ||
| } | ||
| } else { | ||
| continuation?.finish() | ||
| } | ||
|
|
||
| // Notify interceptors of completion | ||
| let capturedRequest = request | ||
| Task { | ||
| for interceptor in interceptors { | ||
| await interceptor.onStreamingResponseComplete(capturedRequest, error: error) | ||
| } | ||
| } |
There was a problem hiding this comment.
Avoid duplicate streaming completion callbacks.
onTermination and urlSession(_:didCompleteWithError:) both notify interceptors, and the termination path always passes error: nil, so errors can be double-logged or misreported. Consider notifying interceptors only once (e.g., in didCompleteWithError) and keep termination for cancellation only.
🧹 Remove the duplicate completion notification
continuation.onTermination = { [weak self] _ in
guard let self else { return }
self.lock.lock()
let task = self.task
self.lock.unlock()
task?.cancel()
-
- // Notify interceptors of completion
- Task {
- for interceptor in capturedInterceptors {
- await interceptor.onStreamingResponseComplete(capturedRequest, error: nil)
- }
- }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| continuation.onTermination = { [weak self] _ in | |
| guard let self else { return } | |
| self.lock.lock() | |
| let task = self.task | |
| self.lock.unlock() | |
| task?.cancel() | |
| // Notify interceptors of completion | |
| Task { | |
| for interceptor in capturedInterceptors { | |
| await interceptor.onStreamingResponseComplete(capturedRequest, error: nil) | |
| } | |
| } | |
| } | |
| let streamResponse = HTTPResponse.Stream( | |
| statusCode: httpResponse.statusCode, | |
| headers: HTTPFields(httpResponse.allHeaderFields as? [String: String] ?? [:]), | |
| underlyingResponse: httpResponse, | |
| body: stream | |
| ) | |
| responseContinuation.resume(returning: streamResponse) | |
| completionHandler(.allow) | |
| } | |
| func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { | |
| lock.lock() | |
| let continuation = dataContinuation | |
| lock.unlock() | |
| continuation?.yield(data) | |
| } | |
| func urlSession( | |
| _ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)? | |
| ) { | |
| lock.lock() | |
| let continuation = dataContinuation | |
| let alreadyResumed = hasResumedResponseContinuation | |
| lock.unlock() | |
| if let error { | |
| if !alreadyResumed { | |
| lock.lock() | |
| hasResumedResponseContinuation = true | |
| lock.unlock() | |
| responseContinuation.resume(throwing: error) | |
| } else { | |
| continuation?.finish(throwing: error) | |
| } | |
| } else { | |
| continuation?.finish() | |
| } | |
| // Notify interceptors of completion | |
| let capturedRequest = request | |
| Task { | |
| for interceptor in interceptors { | |
| await interceptor.onStreamingResponseComplete(capturedRequest, error: error) | |
| } | |
| } | |
| continuation.onTermination = { [weak self] _ in | |
| guard let self else { return } | |
| self.lock.lock() | |
| let task = self.task | |
| self.lock.unlock() | |
| task?.cancel() | |
| } | |
| let streamResponse = HTTPResponse.Stream( | |
| statusCode: httpResponse.statusCode, | |
| headers: HTTPFields(httpResponse.allHeaderFields as? [String: String] ?? [:]), | |
| underlyingResponse: httpResponse, | |
| body: stream | |
| ) | |
| responseContinuation.resume(returning: streamResponse) | |
| completionHandler(.allow) | |
| } | |
| func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { | |
| lock.lock() | |
| let continuation = dataContinuation | |
| lock.unlock() | |
| continuation?.yield(data) | |
| } | |
| func urlSession( | |
| _ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)? | |
| ) { | |
| lock.lock() | |
| let continuation = dataContinuation | |
| let alreadyResumed = hasResumedResponseContinuation | |
| lock.unlock() | |
| if let error { | |
| if !alreadyResumed { | |
| lock.lock() | |
| hasResumedResponseContinuation = true | |
| lock.unlock() | |
| responseContinuation.resume(throwing: error) | |
| } else { | |
| continuation?.finish(throwing: error) | |
| } | |
| } else { | |
| continuation?.finish() | |
| } | |
| // Notify interceptors of completion | |
| let capturedRequest = request | |
| Task { | |
| for interceptor in interceptors { | |
| await interceptor.onStreamingResponseComplete(capturedRequest, error: error) | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@Sources/Helpers/HTTP/HTTPClient.swift` around lines 170 - 230, Both
continuation.onTermination and urlSession(_:task:didCompleteWithError:)
currently call interceptors causing duplicate completion callbacks; change the
logic so only urlSession(_:task:didCompleteWithError:) notifies interceptors via
onStreamingResponseComplete(capturedRequest, error:), and make
continuation.onTermination handle only cancellation (no interceptor calls).
Specifically, remove the interceptor loop in the continuation.onTermination
closure (the capturedInterceptors loop) and ensure
urlSession(_:task:didCompleteWithError:) uses the same
capturedRequest/interceptors set and the hasResumedResponseContinuation guard so
interceptors are invoked exactly once with the correct error (or nil on
success).
Pull Request Test Coverage Report for Build 21674187555Details
💛 - Coveralls |
Summary
This PR integrates streaming responses into the HTTPClient infrastructure, allowing streaming requests to go through the interceptor chain for auth headers, logging, and other middleware.
Problem: The
_invokeWithStreamedResponsemethod was creating its own URLSession, bypassing the HTTPClient abstraction. This meant streaming requests missed:Solution: Added
sendStreamingmethod to HTTPClient that:HTTPResponse.Streamwith metadata + async body streamChanges
sendStreamingmethod andStreamingResponseDelegateHTTPResponse.Streamtype for streaming responsesinterceptRequestandonStreamingResponseCompletemethods_invokeWithStreamedResponseto useHTTPClient.sendStreamingsendStreamingto mock HTTPClient typesRoot Cause
The original streaming implementation at
FunctionsClient.swift:221-244created a separate URLSession:This bypassed the HTTPClient abstraction and its interceptor chain, causing auth tokens to not be automatically synchronized on streaming requests.
Testing
Unit Tests
Manual Testing
Risk Assessment
_invokeWithStreamedResponseAPI unchangedAcceptance Criteria
_invokeWithStreamedResponseuses HTTPClient instead of direct URLSessionsetAuth(token:)workaroundLinear Issue
Closes: SDK-663
🤖 Generated with Claude Code
Summary by CodeRabbit