-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Streamable HTTPCallable functions #14290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 40 commits
231d602
a14d964
a92d7c2
10bec1d
53a2aab
758fbed
93b6c8b
a7e8fe8
6d59fcd
51f02b8
7b61076
a95449e
cdc49ee
426b6bc
9cb0a5e
ad31052
6ee9000
1ffe73d
9fcd91e
177aa8e
f4d678b
74557e7
18f748b
4f956fb
4edc0ad
e50f69c
7356cf9
aed47d6
f6c6cff
75a7574
adf7366
9ef7411
4ee820e
fd68f01
f27bf07
1ffa4f0
80f0991
756dc26
f031c1f
0df7f8d
231c7dd
be80d63
eb415c8
e11825a
bddaef5
9f1434c
eb6bbc8
cccdd24
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -471,6 +471,105 @@ enum FunctionsConstants { | |
} | ||
} | ||
|
||
@available(iOS 13, macCatalyst 13, macOS 10.15, tvOS 13, watchOS 7, *) | ||
func stream(at url: URL, | ||
withObject data: Any?, | ||
options: HTTPSCallableOptions?, | ||
timeout: TimeInterval) async throws | ||
-> AsyncThrowingStream<HTTPSCallableResult, Error> { | ||
let context = try await contextProvider.context(options: options) | ||
let fetcher = try makeFetcherForStreamableContent( | ||
url: url, | ||
data: data, | ||
options: options, | ||
timeout: timeout, | ||
context: context | ||
) | ||
|
||
do { | ||
let rawData = try await fetcher.beginFetch() | ||
return try callableResultFromResponseAsync(data: rawData, error: nil) | ||
|
||
} catch { | ||
// This method always throws when `error` is not `nil`, but ideally, | ||
// it should be refactored so it looks less confusing. | ||
return try callableResultFromResponseAsync(data: nil, error: error) | ||
} | ||
} | ||
|
||
@available(iOS 13.0, *) | ||
func callableResultFromResponseAsync(data: Data?, | ||
error: Error?) throws -> AsyncThrowingStream< | ||
HTTPSCallableResult, Error | ||
|
||
> { | ||
let processedData = | ||
try processResponseDataForStreamableContent( | ||
from: data, | ||
error: error | ||
) | ||
|
||
return processedData | ||
} | ||
|
||
private func makeFetcherForStreamableContent(url: URL, | ||
data: Any?, | ||
options: HTTPSCallableOptions?, | ||
timeout: TimeInterval, | ||
context: FunctionsContext) throws | ||
-> GTMSessionFetcher { | ||
let request = URLRequest( | ||
url: url, | ||
cachePolicy: .useProtocolCachePolicy, | ||
timeoutInterval: timeout | ||
) | ||
let fetcher = fetcherService.fetcher(with: request) | ||
|
||
let data = data ?? NSNull() | ||
let encoded = try serializer.encode(data) | ||
let body = ["data": encoded] | ||
let payload = try JSONSerialization.data(withJSONObject: body, options: [.fragmentsAllowed]) | ||
fetcher.bodyData = payload | ||
|
||
// Set the headers for starting a streaming session. | ||
fetcher.setRequestValue("application/json", forHTTPHeaderField: "Content-Type") | ||
fetcher.setRequestValue("text/event-stream", forHTTPHeaderField: "Accept") | ||
fetcher.request?.httpMethod = "POST" | ||
if let authToken = context.authToken { | ||
let value = "Bearer \(authToken)" | ||
fetcher.setRequestValue(value, forHTTPHeaderField: "Authorization") | ||
} | ||
|
||
if let fcmToken = context.fcmToken { | ||
fetcher.setRequestValue(fcmToken, forHTTPHeaderField: Constants.fcmTokenHeader) | ||
} | ||
|
||
if options?.requireLimitedUseAppCheckTokens == true { | ||
if let appCheckToken = context.limitedUseAppCheckToken { | ||
fetcher.setRequestValue( | ||
appCheckToken, | ||
forHTTPHeaderField: Constants.appCheckTokenHeader | ||
) | ||
} | ||
} else if let appCheckToken = context.appCheckToken { | ||
fetcher.setRequestValue( | ||
appCheckToken, | ||
forHTTPHeaderField: Constants.appCheckTokenHeader | ||
) | ||
} | ||
// Remove after genStream is updated on the emulator or deployed | ||
#if DEBUG | ||
fetcher.allowLocalhostRequest = true | ||
fetcher.allowedInsecureSchemes = ["http"] | ||
#endif | ||
// Override normal security rules if this is a local test. | ||
if emulatorOrigin != nil { | ||
fetcher.allowLocalhostRequest = true | ||
fetcher.allowedInsecureSchemes = ["http"] | ||
} | ||
|
||
return fetcher | ||
} | ||
|
||
private func makeFetcher(url: URL, | ||
data: Any?, | ||
options: HTTPSCallableOptions?, | ||
|
@@ -556,6 +655,58 @@ enum FunctionsConstants { | |
return data | ||
} | ||
|
||
@available(iOS 13, macCatalyst 13, macOS 10.15, tvOS 13, watchOS 7, *) | ||
private func processResponseDataForStreamableContent(from data: Data?, | ||
error: Error?) throws | ||
-> AsyncThrowingStream< | ||
HTTPSCallableResult, | ||
Error | ||
> { | ||
return AsyncThrowingStream { continuation in | ||
Task { | ||
var resultArray = [String]() | ||
do { | ||
if let error = error { | ||
throw error | ||
} | ||
|
||
guard let data = data else { | ||
throw NSError(domain: FunctionsErrorDomain.description, code: -1, userInfo: nil) | ||
} | ||
|
||
if let dataChunk = String(data: data, encoding: .utf8) { | ||
// We remove the "data :" field so it can be safely parsed to Json. | ||
let dataChunkToJson = dataChunk.split(separator: "\n").map { | ||
String($0.dropFirst(6)) | ||
} | ||
eBlender marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
resultArray.append(contentsOf: dataChunkToJson) | ||
} else { | ||
throw NSError(domain: FunctionsErrorDomain.description, code: -1, userInfo: nil) | ||
} | ||
|
||
for dataChunk in resultArray { | ||
let json = try callableResult( | ||
fromResponseData: dataChunk.data( | ||
using: .utf8, | ||
allowLossyConversion: true | ||
) ?? Data() | ||
) | ||
continuation.yield(HTTPSCallableResult(data: json.data)) | ||
} | ||
|
||
continuation.onTermination = { @Sendable _ in | ||
// Callback for cancelling the stream | ||
continuation.finish() | ||
} | ||
// Close the stream once it's done | ||
continuation.finish() | ||
} catch { | ||
continuation.finish(throwing: error) | ||
} | ||
} | ||
} | ||
} | ||
|
||
private func responseDataJSON(from data: Data) throws -> Any { | ||
let responseJSONObject = try JSONSerialization.jsonObject(with: data) | ||
|
||
|
@@ -564,8 +715,10 @@ enum FunctionsConstants { | |
throw FunctionsError(.internal, userInfo: userInfo) | ||
} | ||
|
||
// `result` is checked for backwards compatibility: | ||
guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] else { | ||
// `result` is checked for backwards compatibility, | ||
// `message` is checked for StramableContent: | ||
guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] ?? responseJSON["message"] | ||
else { | ||
let userInfo = [NSLocalizedDescriptionKey: "Response is missing data field."] | ||
throw FunctionsError(.internal, userInfo: userInfo) | ||
} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With #14357 merged, we should be able to get these passing in CI now. This can be done by moving these new tests to the integration test file and use the |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -358,4 +358,73 @@ | |
} | ||
waitForExpectations(timeout: 1.5) | ||
} | ||
|
||
|
||
func testGenerateStreamContent() async throws { | ||
let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: true) | ||
|
||
let input: [String: Any] = ["data": "Why is the sky blue"] | ||
let stream = try await functions!.stream( | ||
at: URL(string: "http://127.0.0.1:5001/demo-project/us-central1/genStream")!, | ||
withObject: input, | ||
options: options, | ||
timeout: 4.0 | ||
) | ||
let result = try await response(from: stream) | ||
XCTAssertEqual( | ||
result, | ||
[ | ||
"chunk hello", | ||
"chunk world", | ||
"chunk this", | ||
"chunk is", | ||
"chunk cool", | ||
"hello world this is cool", | ||
|
||
] | ||
) | ||
} | ||
|
||
func testGenerateStreamContentCanceled() async { | ||
let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: true) | ||
let input: [String: Any] = ["data": "Why is the sky blue"] | ||
|
||
let task = Task.detached { [self] in | ||
let stream = try await functions!.stream( | ||
at: URL(string: "http://127.0.0.1:5001/demo-project/us-central1/genStream")!, | ||
withObject: input, | ||
options: options, | ||
timeout: 4.0 | ||
) | ||
|
||
let result = try await response(from: stream) | ||
// Since we cancel the call we are expecting an empty array. | ||
XCTAssertEqual( | ||
result, | ||
[] | ||
) | ||
} | ||
// We cancel the task and we expect a null response even if the stream was initiated. | ||
task.cancel() | ||
let respone = await task.result | ||
XCTAssertNotNil(respone) | ||
} | ||
} | ||
|
||
private func response(from stream: AsyncThrowingStream<HTTPSCallableResult, | ||
any Error>) async throws -> [String] { | ||
var response = [String]() | ||
for try await result in stream { | ||
Check failure on line 415 in FirebaseFunctions/Tests/Unit/FunctionsTests.swift
|
||
// First chunk of the stream comes as NSDictionary | ||
if let dataChunk = result.data as? NSDictionary { | ||
for (key, value) in dataChunk { | ||
response.append("\(key) \(value)") | ||
} | ||
} else { | ||
// Last chunk is the concatenated result so we have to parse it as String else will | ||
// fail. | ||
if let dataString = result.data as? String { | ||
response.append(dataString) | ||
} | ||
} | ||
} | ||
return response | ||
} |
Uh oh!
There was an error while loading. Please reload this page.