diff --git a/.github/workflows/functions.yml b/.github/workflows/functions.yml index e89bccc8ff5..4da67e2c200 100644 --- a/.github/workflows/functions.yml +++ b/.github/workflows/functions.yml @@ -31,8 +31,6 @@ jobs: matrix: target: [ios, tvos, macos, watchos] build-env: - - os: macos-14 - xcode: Xcode_15.2 - os: macos-15 xcode: Xcode_16.2 runs-on: ${{ matrix.build-env.os }} @@ -43,14 +41,12 @@ jobs: run: sudo xcode-select -s /Applications/${{ matrix.build-env.xcode }}.app/Contents/Developer - name: Setup Bundler run: scripts/setup_bundler.sh - # The integration tests are flaky on Xcode 15 so only run the unit tests. The integration tests still run with SPM. - # - name: Integration Test Server - # run: FirebaseFunctions/Backend/start.sh synchronous + - name: Integration Test Server + run: FirebaseFunctions/Backend/start.sh synchronous - name: Build and test run: | scripts/third_party/travis/retry.sh scripts/pod_lib_lint.rb FirebaseFunctions.podspec \ - --test-specs=unit --platforms=${{ matrix.target }} - + --platforms=${{ matrix.target }} spm-package-resolved: runs-on: macos-14 @@ -145,6 +141,9 @@ jobs: key: ${{needs.spm-package-resolved.outputs.cache_key}} - name: Xcode run: sudo xcode-select -s /Applications/${{ matrix.xcode }}.app/Contents/Developer + - name: Install visionOS, if needed. + if: matrix.target == 'visionOS' + run: xcodebuild -downloadPlatform visionOS - name: Initialize xcodebuild run: scripts/setup_spm_tests.sh - name: Unit Tests diff --git a/FirebaseFunctions/Backend/index.js b/FirebaseFunctions/Backend/index.js index 3bfd6f31328..bebcc0f421b 100644 --- a/FirebaseFunctions/Backend/index.js +++ b/FirebaseFunctions/Backend/index.js @@ -16,6 +16,14 @@ const assert = require('assert'); const functionsV1 = require('firebase-functions/v1'); const functionsV2 = require('firebase-functions/v2'); +// MARK: - Utilities + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +}; + +// MARK: - Callable Functions + exports.dataTest = functionsV1.https.onRequest((request, response) => { assert.deepEqual(request.body, { data: { @@ -121,14 +129,10 @@ exports.timeoutTest = functionsV1.https.onRequest((request, response) => { const streamData = ["hello", "world", "this", "is", "cool"] -function sleep(ms) { - return new Promise(resolve => setTimeout(resolve, ms)); -}; - async function* generateText() { for (const chunk of streamData) { yield chunk; - await sleep(1000); + await sleep(100); } }; @@ -136,7 +140,7 @@ exports.genStream = functionsV2.https.onCall( async (request, response) => { if (request.acceptsStreaming) { for await (const chunk of generateText()) { - response.sendChunk({ chunk }); + response.sendChunk(chunk); } } return streamData.join(" "); @@ -145,11 +149,81 @@ exports.genStream = functionsV2.https.onCall( exports.genStreamError = functionsV2.https.onCall( async (request, response) => { + // Note: The functions backend does not pass the error message to the + // client at this time. + throw Error("BOOM") + } +); + +const weatherForecasts = { + Toronto: { conditions: 'snowy', temperature: 25 }, + London: { conditions: 'rainy', temperature: 50 }, + Dubai: { conditions: 'sunny', temperature: 75 } +}; + +async function* generateForecast(locations) { + for (const location of locations) { + yield { 'location': location, ...weatherForecasts[location.name] }; + await sleep(100); + } +}; + +exports.genStreamWeather = functionsV2.https.onCall( + async (request, response) => { + const forecasts = []; if (request.acceptsStreaming) { - for await (const chunk of generateText()) { - response.write({ chunk }); + for await (const chunk of generateForecast(request.data)) { + forecasts.push(chunk) + response.sendChunk(chunk); + } + } + return { forecasts }; + } +); + +exports.genStreamWeatherError = functionsV2.https.onCall( + async (request, response) => { + if (request.acceptsStreaming) { + for await (const chunk of generateForecast(request.data)) { + // Remove the location field, since the SDK cannot decode the message + // if it's there. + delete chunk.location; + response.sendChunk(chunk); + } + } + return "Number of forecasts generated: " + request.data.length; + } +); + +exports.genStreamEmpty = functionsV2.https.onCall( + async (request, response) => { + if (request.acceptsStreaming) { + // Send no chunks + } + // Implicitly return null. + } +); + +exports.genStreamResultOnly = functionsV2.https.onCall( + async (request, response) => { + if (request.acceptsStreaming) { + // Do not send any chunks. + } + return "Only a result"; + } +); + +exports.genStreamLargeData = functionsV2.https.onCall( + async (request, response) => { + if (request.acceptsStreaming) { + const largeString = 'A'.repeat(10000); + const chunkSize = 1024; + for (let i = 0; i < largeString.length; i += chunkSize) { + const chunk = largeString.substring(i, i + chunkSize); + response.sendChunk(chunk); + await sleep(100); } - throw Error("BOOM") } + return "Stream Completed"; } ); diff --git a/FirebaseFunctions/Backend/start.sh b/FirebaseFunctions/Backend/start.sh index 1ee3777cdc8..8afecf1c387 100755 --- a/FirebaseFunctions/Backend/start.sh +++ b/FirebaseFunctions/Backend/start.sh @@ -57,6 +57,11 @@ FUNCTIONS_BIN="./node_modules/.bin/functions" "${FUNCTIONS_BIN}" deploy timeoutTest --trigger-http "${FUNCTIONS_BIN}" deploy genStream --trigger-http "${FUNCTIONS_BIN}" deploy genStreamError --trigger-http +"${FUNCTIONS_BIN}" deploy genStreamWeather --trigger-http +"${FUNCTIONS_BIN}" deploy genStreamWeatherError --trigger-http +"${FUNCTIONS_BIN}" deploy genStreamEmpty --trigger-http +"${FUNCTIONS_BIN}" deploy genStreamResultOnly --trigger-http +"${FUNCTIONS_BIN}" deploy genStreamLargeData --trigger-http if [ "$1" != "synchronous" ]; then # Wait for the user to tell us to stop the server. diff --git a/FirebaseFunctions/CHANGELOG.md b/FirebaseFunctions/CHANGELOG.md index 89a663ec718..c0354c5ecea 100644 --- a/FirebaseFunctions/CHANGELOG.md +++ b/FirebaseFunctions/CHANGELOG.md @@ -1,3 +1,6 @@ +# Unreleased +- [added] Streaming callable functions are now supported. + # 11.9.0 - [fixed] Fixed App Check token reporting to enable differentiating outdated (`MISSING`) and inauthentic (`INVALID`) clients; see [Monitor App Check diff --git a/FirebaseFunctions/Sources/Callable+Codable.swift b/FirebaseFunctions/Sources/Callable+Codable.swift index 489433a0a7e..287eff55ebb 100644 --- a/FirebaseFunctions/Sources/Callable+Codable.swift +++ b/FirebaseFunctions/Sources/Callable+Codable.swift @@ -15,7 +15,11 @@ import FirebaseSharedSwift import Foundation -/// A `Callable` is reference to a particular Callable HTTPS trigger in Cloud Functions. +/// A `Callable` is a reference to a particular Callable HTTPS trigger in Cloud Functions. +/// +/// - Note: If the Callable HTTPS trigger accepts no parameters, ``Never`` can be used for +/// iOS 17.0+. Otherwise, a simple encodable placeholder type (e.g., +/// `struct EmptyRequest: Encodable {}`) can be used. public struct Callable { /// The timeout to use when calling the function. Defaults to 70 seconds. public var timeoutInterval: TimeInterval { @@ -160,3 +164,175 @@ public struct Callable { return try await call(data) } } + +/// Used to determine when a `StreamResponse<_, _>` is being decoded. +private protocol StreamResponseProtocol {} + +/// A convenience type used to receive both the streaming callable function's yielded messages and +/// its return value. +/// +/// This can be used as the generic `Response` parameter to ``Callable`` to receive both the +/// yielded messages and final return value of the streaming callable function. +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +public enum StreamResponse: Decodable, + StreamResponseProtocol { + /// The message yielded by the callable function. + case message(Message) + /// The final result returned by the callable function. + case result(Result) + + private enum CodingKeys: String, CodingKey { + case message + case result + } + + public init(from decoder: any Decoder) throws { + do { + let container = try decoder + .container(keyedBy: Self.CodingKeys.self) + guard let onlyKey = container.allKeys.first, container.allKeys.count == 1 else { + throw DecodingError + .typeMismatch( + Self.self, + DecodingError.Context( + codingPath: container.codingPath, + debugDescription: "Invalid number of keys found, expected one.", + underlyingError: nil + ) + ) + } + + switch onlyKey { + case .message: + self = try Self + .message(container.decode(Message.self, forKey: .message)) + case .result: + self = try Self + .result(container.decode(Result.self, forKey: .result)) + } + } catch { + throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error]) + } + } +} + +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +public extension Callable where Request: Sendable, Response: Sendable { + /// Creates a stream that yields responses from the streaming callable function. + /// + /// The request to the Cloud Functions backend made by this method automatically includes a FCM + /// token to identify the app instance. If a user is logged in with Firebase Auth, an auth ID + /// token for the user is included. If App Check is integrated, an app check token is included. + /// + /// Firebase Cloud Messaging sends data to the Firebase backend periodically to collect + /// information regarding the app instance. To stop this, see `Messaging.deleteData()`. It + /// resumes with a new FCM Token the next time you call this method. + /// + /// - Important: The final result returned by the callable function is only accessible when + /// using `StreamResponse` as the `Response` generic type. + /// + /// Example of using `stream` _without_ `StreamResponse`: + /// ```swift + /// let callable: Callable = // ... + /// let request: MyRequest = // ... + /// let stream = try callable.stream(request) + /// for try await response in stream { + /// // Process each `MyResponse` message + /// print(response) + /// } + /// ``` + /// + /// Example of using `stream` _with_ `StreamResponse`: + /// ```swift + /// let callable: Callable> = // ... + /// let request: MyRequest = // ... + /// let stream = try callable.stream(request) + /// for try await response in stream { + /// switch response { + /// case .message(let message): + /// // Process each `MyMessage` + /// print(message) + /// case .result(let result): + /// // Process the final `MyResult` + /// print(result) + /// } + /// } + /// ``` + /// + /// - Parameter data: The `Request` data to pass to the callable function. + /// - Throws: A ``FunctionsError`` if the parameter `data` cannot be encoded. + /// - Returns: A stream wrapping responses yielded by the streaming callable function or + /// a ``FunctionsError`` if an error occurred. + func stream(_ data: Request? = nil) throws -> AsyncThrowingStream { + let encoded: Any + do { + encoded = try encoder.encode(data) + } catch { + throw FunctionsError(.invalidArgument, userInfo: [NSUnderlyingErrorKey: error]) + } + + return AsyncThrowingStream { continuation in + Task { + do { + for try await response in callable.stream(encoded) { + do { + // This response JSON should only be able to be decoded to an `StreamResponse<_, _>` + // instance. If the decoding succeeds and the decoded response conforms to + // `StreamResponseProtocol`, we know the `Response` generic argument + // is `StreamResponse<_, _>`. + let responseJSON = switch response { + case .message(let json), .result(let json): json + } + let response = try decoder.decode(Response.self, from: responseJSON) + if response is StreamResponseProtocol { + continuation.yield(response) + } else { + // `Response` is a custom type that matched the decoding logic as the + // `StreamResponse<_, _>` type. Only the `StreamResponse<_, _>` type should decode + // successfully here to avoid exposing the `result` value in a custom type. + throw FunctionsError(.internal) + } + } catch let error as FunctionsError where error.code == .dataLoss { + // `Response` is of type `StreamResponse<_, _>`, but failed to decode. Rethrow. + throw error + } catch { + // `Response` is *not* of type `StreamResponse<_, _>`, and needs to be unboxed and + // decoded. + guard case let .message(messageJSON) = response else { + // Since `Response` is not a `StreamResponse<_, _>`, only messages should be + // decoded. + continue + } + + do { + let boxedMessage = try decoder.decode( + StreamResponseMessage.self, + from: messageJSON + ) + continuation.yield(boxedMessage.message) + } catch { + throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error]) + } + } + } + } catch { + continuation.finish(throwing: error) + } + continuation.finish() + } + } + } + + /// A container type for the type-safe decoding of the message object from the generic `Response` + /// type. + private struct StreamResponseMessage: Decodable { + let message: Response + } +} + +/// A container type for differentiating between message and result responses. +enum JSONStreamResponse { + case message([String: Any]) + case result([String: Any]) +} diff --git a/FirebaseFunctions/Sources/Functions.swift b/FirebaseFunctions/Sources/Functions.swift index ce189579c87..d9e00afb34a 100644 --- a/FirebaseFunctions/Sources/Functions.swift +++ b/FirebaseFunctions/Sources/Functions.swift @@ -471,6 +471,201 @@ enum FunctionsConstants { } } + @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) + func stream(at url: URL, + data: Any?, + options: HTTPSCallableOptions?, + timeout: TimeInterval) + -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task { + let urlRequest: URLRequest + do { + let context = try await contextProvider.context(options: options) + urlRequest = try makeRequestForStreamableContent( + url: url, + data: data, + options: options, + timeout: timeout, + context: context + ) + } catch { + continuation.finish(throwing: FunctionsError( + .invalidArgument, + userInfo: [NSUnderlyingErrorKey: error] + )) + return + } + + let stream: URLSession.AsyncBytes + let rawResponse: URLResponse + do { + (stream, rawResponse) = try await URLSession.shared.bytes(for: urlRequest) + } catch { + continuation.finish(throwing: FunctionsError( + .unavailable, + userInfo: [NSUnderlyingErrorKey: error] + )) + return + } + + // Verify the status code is an HTTP response. + guard let response = rawResponse as? HTTPURLResponse else { + continuation.finish( + throwing: FunctionsError( + .unavailable, + userInfo: [NSLocalizedDescriptionKey: "Response was not an HTTP response."] + ) + ) + return + } + + // Verify the status code is a 200. + guard response.statusCode == 200 else { + continuation.finish( + throwing: FunctionsError( + httpStatusCode: response.statusCode, + region: region, + url: url, + body: nil, + serializer: serializer + ) + ) + return + } + + do { + for try await line in stream.lines { + guard line.hasPrefix("data:") else { + continuation.finish( + throwing: FunctionsError( + .dataLoss, + userInfo: [NSLocalizedDescriptionKey: "Unexpected format for streamed response."] + ) + ) + return + } + + do { + // We can assume 5 characters since it's utf-8 encoded, removing `data:`. + let jsonText = String(line.dropFirst(5)) + let data = try jsonData(jsonText: jsonText) + // Handle the content and parse it. + let content = try callableStreamResult(fromResponseData: data, endpointURL: url) + continuation.yield(content) + } catch { + continuation.finish(throwing: error) + return + } + } + } catch { + continuation.finish( + throwing: FunctionsError( + .dataLoss, + userInfo: [ + NSLocalizedDescriptionKey: "Unexpected format for streamed response.", + NSUnderlyingErrorKey: error, + ] + ) + ) + return + } + + continuation.finish() + } + } + } + + @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) + private func callableStreamResult(fromResponseData data: Data, + endpointURL url: URL) throws -> JSONStreamResponse { + let data = try processedData(fromResponseData: data, endpointURL: url) + + let responseJSONObject: Any + do { + responseJSONObject = try JSONSerialization.jsonObject(with: data) + } catch { + throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error]) + } + + guard let responseJSON = responseJSONObject as? [String: Any] else { + let userInfo = [NSLocalizedDescriptionKey: "Response was not a dictionary."] + throw FunctionsError(.dataLoss, userInfo: userInfo) + } + + if let _ = responseJSON["result"] { + return .result(responseJSON) + } else if let _ = responseJSON["message"] { + return .message(responseJSON) + } else { + throw FunctionsError( + .dataLoss, + userInfo: [NSLocalizedDescriptionKey: "Response is missing result or message field."] + ) + } + } + + private func jsonData(jsonText: String) throws -> Data { + guard let data = jsonText.data(using: .utf8) else { + throw FunctionsError(.dataLoss, userInfo: [ + NSUnderlyingErrorKey: DecodingError.dataCorrupted(DecodingError.Context( + codingPath: [], + debugDescription: "Could not parse response as UTF8." + )), + ]) + } + return data + } + + private func makeRequestForStreamableContent(url: URL, + data: Any?, + options: HTTPSCallableOptions?, + timeout: TimeInterval, + context: FunctionsContext) throws + -> URLRequest { + var urlRequest = URLRequest( + url: url, + cachePolicy: .useProtocolCachePolicy, + timeoutInterval: timeout + ) + + let data = data ?? NSNull() + let encoded = try serializer.encode(data) + let body = ["data": encoded] + let payload = try JSONSerialization.data(withJSONObject: body, options: [.fragmentsAllowed]) + urlRequest.httpBody = payload + + // Set the headers for starting a streaming session. + urlRequest.setValue("application/json", forHTTPHeaderField: "Content-Type") + urlRequest.setValue("text/event-stream", forHTTPHeaderField: "Accept") + urlRequest.httpMethod = "POST" + + if let authToken = context.authToken { + let value = "Bearer \(authToken)" + urlRequest.setValue(value, forHTTPHeaderField: "Authorization") + } + + if let fcmToken = context.fcmToken { + urlRequest.setValue(fcmToken, forHTTPHeaderField: Constants.fcmTokenHeader) + } + + if options?.requireLimitedUseAppCheckTokens == true { + if let appCheckToken = context.limitedUseAppCheckToken { + urlRequest.setValue( + appCheckToken, + forHTTPHeaderField: Constants.appCheckTokenHeader + ) + } + } else if let appCheckToken = context.appCheckToken { + urlRequest.setValue( + appCheckToken, + forHTTPHeaderField: Constants.appCheckTokenHeader + ) + } + + return urlRequest + } + private func makeFetcher(url: URL, data: Any?, options: HTTPSCallableOptions?, diff --git a/FirebaseFunctions/Sources/HTTPSCallable.swift b/FirebaseFunctions/Sources/HTTPSCallable.swift index c2281e54866..b423ac4195a 100644 --- a/FirebaseFunctions/Sources/HTTPSCallable.swift +++ b/FirebaseFunctions/Sources/HTTPSCallable.swift @@ -143,4 +143,9 @@ open class HTTPSCallable: NSObject { try await functions .callFunction(at: url, withObject: data, options: options, timeout: timeoutInterval) } + + @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) + func stream(_ data: Any? = nil) -> AsyncThrowingStream { + functions.stream(at: url, data: data, options: options, timeout: timeoutInterval) + } } diff --git a/FirebaseFunctions/Tests/Integration/IntegrationTests.swift b/FirebaseFunctions/Tests/Integration/IntegrationTests.swift index 5260bd10b2b..878cec1c9a0 100644 --- a/FirebaseFunctions/Tests/Integration/IntegrationTests.swift +++ b/FirebaseFunctions/Tests/Integration/IntegrationTests.swift @@ -65,6 +65,7 @@ struct DataTestResponse: Decodable, Equatable { var code: Int32 } +/// - Important: These tests require the emulator. Run `./FirebaseFunctions/Backend/start.sh` class IntegrationTests: XCTestCase { let functions = Functions(projectID: "functions-integration-test", region: "us-central1", @@ -868,6 +869,427 @@ class IntegrationTests: XCTestCase { } } +// MARK: - Streaming + +/// A convenience type used to represent that a callable function does not +/// accept parameters. +/// +/// This can be used as the generic `Request` parameter to ``Callable`` to +/// indicate the callable function does not accept parameters. +private struct EmptyRequest: Encodable {} + +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +extension IntegrationTests { + func testStream_NoArgs() async throws { + // 1. Custom `EmptyRequest` struct is passed as a placeholder generic arg. + let callable: Callable = functions.httpsCallable("genStream") + // 2. No request data is passed when creating stream. + let stream = try callable.stream() + var streamContents: [String] = [] + for try await response in stream { + streamContents.append(response) + } + XCTAssertEqual( + streamContents, + ["hello", "world", "this", "is", "cool"] + ) + } + + @available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) + func testStream_NoArgs_UeeNever() async throws { + let callable: Callable = functions.httpsCallable("genStream") + let stream = try callable.stream() + var streamContents: [String] = [] + for try await response in stream { + streamContents.append(response) + } + XCTAssertEqual( + streamContents, + ["hello", "world", "this", "is", "cool"] + ) + } + + func testStream_SimpleStreamResponse() async throws { + let callable: Callable> = functions + .httpsCallable("genStream") + let stream = try callable.stream() + var streamContents: [String] = [] + for try await response in stream { + switch response { + case let .message(message): + streamContents.append(message) + case let .result(result): + streamContents.append(result) + } + } + XCTAssertEqual( + streamContents, + ["hello", "world", "this", "is", "cool", "hello world this is cool"] + ) + } + + func testStream_CodableString() async throws { + let byName: Callable = functions.httpsCallable("genStream") + let stream = try byName.stream() + let result: [String] = try await stream.reduce([]) { $0 + [$1] } + XCTAssertEqual(result, ["hello", "world", "this", "is", "cool"]) + } + + private struct Location: Codable, Equatable { + let name: String + } + + private struct WeatherForecast: Decodable, Equatable { + enum Conditions: String, Decodable { + case sunny + case rainy + case snowy + } + + let location: Location + let temperature: Int + let conditions: Conditions + } + + private struct WeatherForecastReport: Decodable, Equatable { + let forecasts: [WeatherForecast] + } + + func testStream_CodableObject() async throws { + let callable: Callable<[Location], WeatherForecast> = functions + .httpsCallable("genStreamWeather") + let stream = try callable.stream([ + Location(name: "Toronto"), + Location(name: "London"), + Location(name: "Dubai"), + ]) + let result: [WeatherForecast] = try await stream.reduce([]) { $0 + [$1] } + XCTAssertEqual( + result, + [ + WeatherForecast(location: Location(name: "Toronto"), temperature: 25, conditions: .snowy), + WeatherForecast(location: Location(name: "London"), temperature: 50, conditions: .rainy), + WeatherForecast(location: Location(name: "Dubai"), temperature: 75, conditions: .sunny), + ] + ) + } + + func testStream_ResponseMessageDecodingFailure() async throws { + let callable: Callable<[Location], StreamResponse> = + functions + .httpsCallable("genStreamWeatherError") + let stream = try callable.stream([Location(name: "Toronto")]) + do { + for try await _ in stream { + XCTFail("Expected error to be thrown from stream.") + } + } catch let error as FunctionsError where error.code == .dataLoss { + XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? DecodingError) + } + } + + func testStream_ResponseResultDecodingFailure() async throws { + let callable: Callable<[Location], StreamResponse> = functions + .httpsCallable("genStreamWeather") + let stream = try callable.stream([Location(name: "Toronto")]) + do { + for try await response in stream { + if case .result = response { + XCTFail("Expected error to be thrown from stream.") + } + } + } catch let error as FunctionsError where error.code == .dataLoss { + XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? DecodingError) + } + } + + func testStream_ComplexStreamResponse() async throws { + let callable: Callable<[Location], StreamResponse> = + functions + .httpsCallable("genStreamWeather") + let stream = try callable.stream([ + Location(name: "Toronto"), + Location(name: "London"), + Location(name: "Dubai"), + ]) + var streamContents: [WeatherForecast] = [] + var streamResult: WeatherForecastReport? + for try await response in stream { + switch response { + case let .message(message): + streamContents.append(message) + case let .result(result): + streamResult = result + } + } + XCTAssertEqual( + streamContents, + [ + WeatherForecast(location: Location(name: "Toronto"), temperature: 25, conditions: .snowy), + WeatherForecast(location: Location(name: "London"), temperature: 50, conditions: .rainy), + WeatherForecast(location: Location(name: "Dubai"), temperature: 75, conditions: .sunny), + ] + ) + + try XCTAssertEqual( + XCTUnwrap(streamResult), WeatherForecastReport(forecasts: streamContents) + ) + } + + func testStream_ComplexStreamResponse_Functional() async throws { + let callable: Callable<[Location], StreamResponse> = + functions + .httpsCallable("genStreamWeather") + let stream = try callable.stream([ + Location(name: "Toronto"), + Location(name: "London"), + Location(name: "Dubai"), + ]) + let result: (accumulatedMessages: [WeatherForecast], result: WeatherForecastReport?) = + try await stream.reduce(([], nil)) { partialResult, streamResponse in + switch streamResponse { + case let .message(message): + (partialResult.accumulatedMessages + [message], partialResult.result) + case let .result(result): + (partialResult.accumulatedMessages, result) + } + } + XCTAssertEqual( + result.accumulatedMessages, + [ + WeatherForecast(location: Location(name: "Toronto"), temperature: 25, conditions: .snowy), + WeatherForecast(location: Location(name: "London"), temperature: 50, conditions: .rainy), + WeatherForecast(location: Location(name: "Dubai"), temperature: 75, conditions: .sunny), + ] + ) + + try XCTAssertEqual( + XCTUnwrap(result.result), WeatherForecastReport(forecasts: result.accumulatedMessages) + ) + } + + func testStream_Canceled() async throws { + let task = Task.detached { [self] in + let callable: Callable = functions.httpsCallable("genStream") + let stream = try callable.stream() + // Since we cancel the call we are expecting an empty array. + return try await stream.reduce([]) { $0 + [$1] } as [String] + } + // We cancel the task and we expect a null response even if the stream was initiated. + task.cancel() + let respone = try await task.value + XCTAssertEqual(respone, []) + } + + func testStream_NonexistentFunction() async throws { + let callable: Callable = functions.httpsCallable( + "nonexistentFunction" + ) + let stream = try callable.stream() + do { + for try await _ in stream { + XCTFail("Expected error to be thrown from stream.") + } + } catch let error as FunctionsError where error.code == .notFound { + XCTAssertEqual(error.localizedDescription, "NOT FOUND") + } + } + + func testStream_StreamError() async throws { + let callable: Callable = functions.httpsCallable("genStreamError") + let stream = try callable.stream() + do { + for try await _ in stream { + XCTFail("Expected error to be thrown from stream.") + } + } catch let error as FunctionsError where error.code == .internal { + XCTAssertEqual(error.localizedDescription, "INTERNAL") + } + } + + func testStream_RequestEncodingFailure() async throws { + struct Foo: Encodable { + enum CodingKeys: CodingKey {} + + func encode(to encoder: any Encoder) throws { + throw EncodingError + .invalidValue("", EncodingError.Context(codingPath: [], debugDescription: "")) + } + } + let callable: Callable = functions + .httpsCallable("genStream") + do { + _ = try callable.stream(Foo()) + } catch let error as FunctionsError where error.code == .invalidArgument { + _ = try XCTUnwrap(error.errorUserInfo[NSUnderlyingErrorKey] as? EncodingError) + } + } + + /// This tests an edge case to assert that if a custom `Response` is used + /// that matches the decoding logic of `StreamResponse`, the custom + /// `Response` does not decode successfully. + func testStream_ResultIsOnlyExposedInStreamResponse() async throws { + // The implementation is copied from `StreamResponse`. The only difference is the do-catch is + // removed from the decoding initializer. + enum MyStreamResponse: Decodable { + /// The message yielded by the callable function. + case message(Message) + /// The final result returned by the callable function. + case result(Result) + + private enum CodingKeys: String, CodingKey { + case message + case result + } + + public init(from decoder: any Decoder) throws { + let container = try decoder + .container(keyedBy: Self.CodingKeys.self) + var allKeys = ArraySlice(container.allKeys) + guard let onlyKey = allKeys.popFirst(), allKeys.isEmpty else { + throw DecodingError + .typeMismatch( + Self.self, + DecodingError.Context( + codingPath: container.codingPath, + debugDescription: "Invalid number of keys found, expected one.", + underlyingError: nil + ) + ) + } + + switch onlyKey { + case .message: + self = try Self + .message(container.decode(Message.self, forKey: .message)) + case .result: + self = try Self + .result(container.decode(Result.self, forKey: .result)) + } + } + } + + let callable: Callable<[Location], MyStreamResponse> = + functions + .httpsCallable("genStreamWeather") + let stream = try callable.stream([Location(name: "Toronto")]) + do { + for try await _ in stream { + XCTFail("Expected error to be thrown from stream.") + } + } catch let error as FunctionsError where error.code == .dataLoss { + XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? DecodingError) + } + } + + func testStream_ForNonStreamingCF3() async throws { + let callable: Callable = functions.httpsCallable("scalarTest") + let stream = try callable.stream(17) + do { + for try await _ in stream { + XCTFail("Expected error to be thrown from stream.") + } + } catch let error as FunctionsError where error.code == .dataLoss { + XCTAssertEqual(error.localizedDescription, "Unexpected format for streamed response.") + } + } + + func testStream_EmptyStream() async throws { + let callable: Callable = functions.httpsCallable("genStreamEmpty") + var streamContents: [String] = [] + for try await response in try callable.stream() { + streamContents.append(response) + } + XCTAssertEqual(streamContents, []) + } + + func testStream_ResultOnly() async throws { + let callable: Callable = functions.httpsCallable("genStreamResultOnly") + let stream = try callable.stream() + for try await _ in stream { + // The stream should not yield anything, so this should not be reached. + XCTFail("Stream should not yield any messages") + } + // Because StreamResponse was not used, the result is not accessible, + // but the message should not throw. + } + + func testStream_ResultOnly_StreamResponse() async throws { + struct EmptyResponse: Decodable {} + let callable: Callable> = functions + .httpsCallable( + "genStreamResultOnly" + ) + let stream = try callable.stream() + var streamResult = "" + for try await response in stream { + switch response { + case .message: + XCTFail("Stream should not yield any messages") + case let .result(result): + streamResult = result + } + } + // The hardcoded string matches the CF3's return value. + XCTAssertEqual(streamResult, "Only a result") + } + + func testStream_UnexpectedType() async throws { + // This function yields strings, not integers. + let callable: Callable = functions.httpsCallable("genStream") + let stream = try callable.stream() + do { + for try await _ in stream { + XCTFail("Expected error to be thrown from stream.") + } + } catch let error as FunctionsError where error.code == .dataLoss { + XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? DecodingError) + } + } + + func testStream_Timeout() async throws { + var callable: Callable = functions.httpsCallable("timeoutTest") + // Set a short timeout + callable.timeoutInterval = 0.01 // 10 milliseconds + + let stream = try callable.stream() + + do { + for try await _ in stream { + XCTFail("Expected error to be thrown from stream.") + } + } catch let error as FunctionsError where error.code == .unavailable { + // This should be a timeout error. + XCTAssertEqual( + error.localizedDescription, + "The operation couldn’t be completed. (com.firebase.functions error 14.)" + ) + XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? URLError) + } + } + + func testStream_LargeData() async throws { + func generateLargeString() -> String { + var largeString = "" + for _ in 0 ..< 10000 { + largeString += "A" + } + return largeString + } + let callable: Callable = functions.httpsCallable("genStreamLargeData") + let stream = try callable.stream() + var concatenatedData = "" + for try await response in stream { + concatenatedData += response + } + // Assert that the concatenated data matches the expected large data. + XCTAssertEqual(concatenatedData, generateLargeString()) + } +} + +// MARK: - Helpers + private class AuthTokenProvider: AuthInterop { func getUserID() -> String? { return "fake user"