diff --git a/FirebaseFunctions/Sources/Callable+Codable.swift b/FirebaseFunctions/Sources/Callable+Codable.swift index 489433a0a7e..07b1fae6f0d 100644 --- a/FirebaseFunctions/Sources/Callable+Codable.swift +++ b/FirebaseFunctions/Sources/Callable+Codable.swift @@ -159,4 +159,22 @@ public struct Callable { public func callAsFunction(_ data: Request) async throws -> Response { return try await call(data) } + + // TODO: Look into handling parameter-less functions. + @available(iOS 15, *) + public func stream(_ data: Request) async throws -> AsyncThrowingStream { + let encoded = try encoder.encode(data) + return AsyncThrowingStream { continuation in + Task { + do { + for try await result in callable.stream(encoded) { + let response = try decoder.decode(Response.self, from: result.data) + continuation.yield(response) + } + } catch { + continuation.finish(throwing: error) + } + } + } + } } diff --git a/FirebaseFunctions/Sources/Functions.swift b/FirebaseFunctions/Sources/Functions.swift index 51e405b2f39..58251e81c20 100644 --- a/FirebaseFunctions/Sources/Functions.swift +++ b/FirebaseFunctions/Sources/Functions.swift @@ -471,6 +471,169 @@ enum FunctionsConstants { } } + @available(iOS 15, *) + func stream(at url: URL, + withObject data: Any?, + options: HTTPSCallableOptions?, + timeout: TimeInterval) + -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + // TODO: Vertex prints the curl command. Should this? + + Task { + // TODO: This API does not throw. Should the throwing request + // setup be in the stream or one level up? + let urlRequest: URLRequest + do { + let context = try await contextProvider.context(options: options) + urlRequest = try makeRequestForStreamableContent( + url: url, + data: data, + options: options, + timeout: timeout, + context: context + ) + // TODO: Address below commented out code. + // // Override normal security rules if this is a local test. + // var configuration = URLSessionConfiguration.default + // if let emulatorOrigin { + // configuration. + // } + } catch { + continuation.finish(throwing: error) + return + } + + let stream: URLSession.AsyncBytes + let rawResponse: URLResponse + do { + // TODO: Look into injecting URLSession for unit tests. + (stream, rawResponse) = try await URLSession.shared.bytes(for: urlRequest) + } catch { + continuation.finish(throwing: error) + return + } + + // Verify the status code is 200 + guard let response = rawResponse as? HTTPURLResponse else { + continuation.finish( + throwing: FunctionsError( + .internal, + userInfo: [NSLocalizedDescriptionKey: "Response was not an HTTP response."] + ) + ) + return + } + + guard response.statusCode == 200 else { + // TODO: Add test for error case and parse out error. + return + } + + for try await line in stream.lines { + if line.hasPrefix("data:") { + // We can assume 5 characters since it's utf-8 encoded, removing `data:`. + let jsonText = String(line.dropFirst(5)) + let data: Data + do { + data = try jsonData(jsonText: jsonText) + } catch { + continuation.finish(throwing: error) + return + } + + // Handle the content. + do { + let content = try callableResult(fromResponseData: data) + continuation.yield(content) + } catch { + continuation.finish(throwing: error) + return + } + } else { + // TODO: Throw error with unexpected formatted lines. + } + } + + continuation.finish(throwing: nil) + } + } + } + + private func jsonData(jsonText: String) throws -> Data { + guard let data = jsonText.data(using: .utf8) else { + throw DecodingError.dataCorrupted(DecodingError.Context( + codingPath: [], + debugDescription: "Could not parse response as UTF8." + )) + } + return data + } + + @available(iOS 13.0, *) + func callableResultFromResponseAsync(data: Data?, + error: Error?) throws -> AsyncThrowingStream< + HTTPSCallableResult, Error + + > { + let processedData = + try processResponseDataForStreamableContent( + from: data, + error: error + ) + + return processedData + } + + private func makeRequestForStreamableContent(url: URL, + data: Any?, + options: HTTPSCallableOptions?, + timeout: TimeInterval, + context: FunctionsContext) throws + -> URLRequest { + var urlRequest = URLRequest( + url: url, + cachePolicy: .useProtocolCachePolicy, + timeoutInterval: timeout + ) + + let data = data ?? NSNull() + let encoded = try serializer.encode(data) + let body = ["data": encoded] + let payload = try JSONSerialization.data(withJSONObject: body, options: [.fragmentsAllowed]) + urlRequest.httpBody = payload + + // Set the headers for starting a streaming session. + urlRequest.setValue("application/json", forHTTPHeaderField: "Content-Type") + urlRequest.setValue("text/event-stream", forHTTPHeaderField: "Accept") + urlRequest.httpMethod = "POST" + + if let authToken = context.authToken { + let value = "Bearer \(authToken)" + urlRequest.setValue(value, forHTTPHeaderField: "Authorization") + } + + if let fcmToken = context.fcmToken { + urlRequest.setValue(fcmToken, forHTTPHeaderField: Constants.fcmTokenHeader) + } + + if options?.requireLimitedUseAppCheckTokens == true { + if let appCheckToken = context.limitedUseAppCheckToken { + urlRequest.setValue( + appCheckToken, + forHTTPHeaderField: Constants.appCheckTokenHeader + ) + } + } else if let appCheckToken = context.appCheckToken { + urlRequest.setValue( + appCheckToken, + forHTTPHeaderField: Constants.appCheckTokenHeader + ) + } + + return urlRequest + } + private func makeFetcher(url: URL, data: Any?, options: HTTPSCallableOptions?, @@ -556,6 +719,58 @@ enum FunctionsConstants { return data } + @available(iOS 13, macCatalyst 13, macOS 10.15, tvOS 13, watchOS 7, *) + private func processResponseDataForStreamableContent(from data: Data?, + error: Error?) throws + -> AsyncThrowingStream< + HTTPSCallableResult, + Error + > { + return AsyncThrowingStream { continuation in + Task { + var resultArray = [String]() + do { + if let error = error { + throw error + } + + guard let data = data else { + throw NSError(domain: FunctionsErrorDomain.description, code: -1, userInfo: nil) + } + + if let dataChunk = String(data: data, encoding: .utf8) { + // We remove the "data :" field so it can be safely parsed to Json. + let dataChunkToJson = dataChunk.split(separator: "\n").map { + String($0.dropFirst(6)) + } + resultArray.append(contentsOf: dataChunkToJson) + } else { + throw NSError(domain: FunctionsErrorDomain.description, code: -1, userInfo: nil) + } + + for dataChunk in resultArray { + let json = try callableResult( + fromResponseData: dataChunk.data( + using: .utf8, + allowLossyConversion: true + ) ?? Data() + ) + continuation.yield(HTTPSCallableResult(data: json.data)) + } + + continuation.onTermination = { @Sendable _ in + // Callback for cancelling the stream + continuation.finish() + } + // Close the stream once it's done + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + } + } + } + private func responseDataJSON(from data: Data) throws -> Any { let responseJSONObject = try JSONSerialization.jsonObject(with: data) @@ -564,8 +779,10 @@ enum FunctionsConstants { throw FunctionsError(.internal, userInfo: userInfo) } - // `result` is checked for backwards compatibility: - guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] else { + // `result` is checked for backwards compatibility, + // `message` is checked for StramableContent: + guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] ?? responseJSON["message"] + else { let userInfo = [NSLocalizedDescriptionKey: "Response is missing data field."] throw FunctionsError(.internal, userInfo: userInfo) } diff --git a/FirebaseFunctions/Sources/HTTPSCallable.swift b/FirebaseFunctions/Sources/HTTPSCallable.swift index c2281e54866..a15179d4cf9 100644 --- a/FirebaseFunctions/Sources/HTTPSCallable.swift +++ b/FirebaseFunctions/Sources/HTTPSCallable.swift @@ -39,7 +39,7 @@ open class HTTPSCallable: NSObject { // The functions client to use for making calls. private let functions: Functions - private let url: URL + let url: URL private let options: HTTPSCallableOptions? @@ -143,4 +143,9 @@ open class HTTPSCallable: NSObject { try await functions .callFunction(at: url, withObject: data, options: options, timeout: timeoutInterval) } + + @available(iOS 15, *) + func stream(_ data: Any? = nil) -> AsyncThrowingStream { + functions.stream(at: url, withObject: data, options: options, timeout: timeoutInterval) + } } diff --git a/FirebaseFunctions/Tests/Integration/IntegrationTests.swift b/FirebaseFunctions/Tests/Integration/IntegrationTests.swift index 5260bd10b2b..60f15b4247a 100644 --- a/FirebaseFunctions/Tests/Integration/IntegrationTests.swift +++ b/FirebaseFunctions/Tests/Integration/IntegrationTests.swift @@ -866,6 +866,78 @@ class IntegrationTests: XCTestCase { XCTAssertEqual(response, expected) } } + + @available(iOS 15, *) + func testGenerateStreamContent() async throws { + let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: true) + + let input: [String: Any] = ["data": "Why is the sky blue"] + + let stream = functions.stream( + at: emulatorURL("genStream"), + withObject: input, + options: options, + timeout: 4.0 + ) + let result = try await response(from: stream) + XCTAssertEqual( + result, + [ + "chunk hello", + "chunk world", + "chunk this", + "chunk is", + "chunk cool", + "hello world this is cool", + ] + ) + } + + @available(iOS 15, *) + func testGenerateStreamContentCanceled() async { + let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: true) + let input: [String: Any] = ["data": "Why is the sky blue"] + + let task = Task.detached { [self] in + let stream = functions.stream( + at: emulatorURL("genStream"), + withObject: input, + options: options, + timeout: 4.0 + ) + + let result = try await response(from: stream) + // Since we cancel the call we are expecting an empty array. + XCTAssertEqual( + result, + [] + ) + } + // We cancel the task and we expect a null response even if the stream was initiated. + task.cancel() + let respone = await task.result + XCTAssertNotNil(respone) + } + + private func response(from stream: AsyncThrowingStream) async throws -> [String] { + var response = [String]() + for try await result in stream { + // First chunk of the stream comes as NSDictionary + if let dataChunk = result.data as? NSDictionary { + for (key, value) in dataChunk { + response.append("\(key) \(value)") + } + } else { + // Last chunk is the concatenated result so we have to parse it as String else will + // fail. + if let dataString = result.data as? String { + response.append(dataString) + } + } + } + return response + } } private class AuthTokenProvider: AuthInterop { diff --git a/FirebaseFunctions/Tests/Unit/FunctionsTests.swift b/FirebaseFunctions/Tests/Unit/FunctionsTests.swift index 42e684cdf1a..24141b9157b 100644 --- a/FirebaseFunctions/Tests/Unit/FunctionsTests.swift +++ b/FirebaseFunctions/Tests/Unit/FunctionsTests.swift @@ -22,7 +22,9 @@ import FirebaseCore import GTMSessionFetcherCore #endif -import SharedTestUtilities +#if SWIFT_PACKAGE + import SharedTestUtilities +#endif import XCTest class FunctionsTests: XCTestCase { @@ -358,4 +360,8 @@ class FunctionsTests: XCTestCase { } waitForExpectations(timeout: 1.5) } + + // TODO: Implement unit test variants. + func testGenerateStreamContent() async throws {} + func testGenerateStreamContentCanceled() async {} }