Skip to content

Commit baf60ed

Browse files
ncooke3eBlenderrlazo
authored
[Functions] Add support for streamable cloud functions (#14395)
Co-authored-by: Eblen Macari <[email protected]> Co-authored-by: Eblen M <[email protected]> Co-authored-by: Rodrigo Lazo <[email protected]>
1 parent 37a1a32 commit baf60ed

File tree

8 files changed

+896
-17
lines changed

8 files changed

+896
-17
lines changed

.github/workflows/functions.yml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ jobs:
3131
matrix:
3232
target: [ios, tvos, macos, watchos]
3333
build-env:
34-
- os: macos-14
35-
xcode: Xcode_15.2
3634
- os: macos-15
3735
xcode: Xcode_16.2
3836
runs-on: ${{ matrix.build-env.os }}
@@ -43,14 +41,12 @@ jobs:
4341
run: sudo xcode-select -s /Applications/${{ matrix.build-env.xcode }}.app/Contents/Developer
4442
- name: Setup Bundler
4543
run: scripts/setup_bundler.sh
46-
# The integration tests are flaky on Xcode 15 so only run the unit tests. The integration tests still run with SPM.
47-
# - name: Integration Test Server
48-
# run: FirebaseFunctions/Backend/start.sh synchronous
44+
- name: Integration Test Server
45+
run: FirebaseFunctions/Backend/start.sh synchronous
4946
- name: Build and test
5047
run: |
5148
scripts/third_party/travis/retry.sh scripts/pod_lib_lint.rb FirebaseFunctions.podspec \
52-
--test-specs=unit --platforms=${{ matrix.target }}
53-
49+
--platforms=${{ matrix.target }}
5450
5551
spm-package-resolved:
5652
runs-on: macos-14
@@ -145,6 +141,9 @@ jobs:
145141
key: ${{needs.spm-package-resolved.outputs.cache_key}}
146142
- name: Xcode
147143
run: sudo xcode-select -s /Applications/${{ matrix.xcode }}.app/Contents/Developer
144+
- name: Install visionOS, if needed.
145+
if: matrix.target == 'visionOS'
146+
run: xcodebuild -downloadPlatform visionOS
148147
- name: Initialize xcodebuild
149148
run: scripts/setup_spm_tests.sh
150149
- name: Unit Tests

FirebaseFunctions/Backend/index.js

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ const assert = require('assert');
1616
const functionsV1 = require('firebase-functions/v1');
1717
const functionsV2 = require('firebase-functions/v2');
1818

19+
// MARK: - Utilities
20+
21+
function sleep(ms) {
22+
return new Promise(resolve => setTimeout(resolve, ms));
23+
};
24+
25+
// MARK: - Callable Functions
26+
1927
exports.dataTest = functionsV1.https.onRequest((request, response) => {
2028
assert.deepEqual(request.body, {
2129
data: {
@@ -121,22 +129,18 @@ exports.timeoutTest = functionsV1.https.onRequest((request, response) => {
121129

122130
const streamData = ["hello", "world", "this", "is", "cool"]
123131

124-
function sleep(ms) {
125-
return new Promise(resolve => setTimeout(resolve, ms));
126-
};
127-
128132
async function* generateText() {
129133
for (const chunk of streamData) {
130134
yield chunk;
131-
await sleep(1000);
135+
await sleep(100);
132136
}
133137
};
134138

135139
exports.genStream = functionsV2.https.onCall(
136140
async (request, response) => {
137141
if (request.acceptsStreaming) {
138142
for await (const chunk of generateText()) {
139-
response.sendChunk({ chunk });
143+
response.sendChunk(chunk);
140144
}
141145
}
142146
return streamData.join(" ");
@@ -145,11 +149,81 @@ exports.genStream = functionsV2.https.onCall(
145149

146150
exports.genStreamError = functionsV2.https.onCall(
147151
async (request, response) => {
152+
// Note: The functions backend does not pass the error message to the
153+
// client at this time.
154+
throw Error("BOOM")
155+
}
156+
);
157+
158+
const weatherForecasts = {
159+
Toronto: { conditions: 'snowy', temperature: 25 },
160+
London: { conditions: 'rainy', temperature: 50 },
161+
Dubai: { conditions: 'sunny', temperature: 75 }
162+
};
163+
164+
async function* generateForecast(locations) {
165+
for (const location of locations) {
166+
yield { 'location': location, ...weatherForecasts[location.name] };
167+
await sleep(100);
168+
}
169+
};
170+
171+
exports.genStreamWeather = functionsV2.https.onCall(
172+
async (request, response) => {
173+
const forecasts = [];
148174
if (request.acceptsStreaming) {
149-
for await (const chunk of generateText()) {
150-
response.write({ chunk });
175+
for await (const chunk of generateForecast(request.data)) {
176+
forecasts.push(chunk)
177+
response.sendChunk(chunk);
178+
}
179+
}
180+
return { forecasts };
181+
}
182+
);
183+
184+
exports.genStreamWeatherError = functionsV2.https.onCall(
185+
async (request, response) => {
186+
if (request.acceptsStreaming) {
187+
for await (const chunk of generateForecast(request.data)) {
188+
// Remove the location field, since the SDK cannot decode the message
189+
// if it's there.
190+
delete chunk.location;
191+
response.sendChunk(chunk);
192+
}
193+
}
194+
return "Number of forecasts generated: " + request.data.length;
195+
}
196+
);
197+
198+
exports.genStreamEmpty = functionsV2.https.onCall(
199+
async (request, response) => {
200+
if (request.acceptsStreaming) {
201+
// Send no chunks
202+
}
203+
// Implicitly return null.
204+
}
205+
);
206+
207+
exports.genStreamResultOnly = functionsV2.https.onCall(
208+
async (request, response) => {
209+
if (request.acceptsStreaming) {
210+
// Do not send any chunks.
211+
}
212+
return "Only a result";
213+
}
214+
);
215+
216+
exports.genStreamLargeData = functionsV2.https.onCall(
217+
async (request, response) => {
218+
if (request.acceptsStreaming) {
219+
const largeString = 'A'.repeat(10000);
220+
const chunkSize = 1024;
221+
for (let i = 0; i < largeString.length; i += chunkSize) {
222+
const chunk = largeString.substring(i, i + chunkSize);
223+
response.sendChunk(chunk);
224+
await sleep(100);
151225
}
152-
throw Error("BOOM")
153226
}
227+
return "Stream Completed";
154228
}
155229
);

FirebaseFunctions/Backend/start.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ FUNCTIONS_BIN="./node_modules/.bin/functions"
5757
"${FUNCTIONS_BIN}" deploy timeoutTest --trigger-http
5858
"${FUNCTIONS_BIN}" deploy genStream --trigger-http
5959
"${FUNCTIONS_BIN}" deploy genStreamError --trigger-http
60+
"${FUNCTIONS_BIN}" deploy genStreamWeather --trigger-http
61+
"${FUNCTIONS_BIN}" deploy genStreamWeatherError --trigger-http
62+
"${FUNCTIONS_BIN}" deploy genStreamEmpty --trigger-http
63+
"${FUNCTIONS_BIN}" deploy genStreamResultOnly --trigger-http
64+
"${FUNCTIONS_BIN}" deploy genStreamLargeData --trigger-http
6065

6166
if [ "$1" != "synchronous" ]; then
6267
# Wait for the user to tell us to stop the server.

FirebaseFunctions/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# Unreleased
2+
- [added] Streaming callable functions are now supported.
3+
14
# 11.9.0
25
- [fixed] Fixed App Check token reporting to enable differentiating outdated
36
(`MISSING`) and inauthentic (`INVALID`) clients; see [Monitor App Check

FirebaseFunctions/Sources/Callable+Codable.swift

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
import FirebaseSharedSwift
1616
import Foundation
1717

18-
/// A `Callable` is reference to a particular Callable HTTPS trigger in Cloud Functions.
18+
/// A `Callable` is a reference to a particular Callable HTTPS trigger in Cloud Functions.
19+
///
20+
/// - Note: If the Callable HTTPS trigger accepts no parameters, ``Never`` can be used for
21+
/// iOS 17.0+. Otherwise, a simple encodable placeholder type (e.g.,
22+
/// `struct EmptyRequest: Encodable {}`) can be used.
1923
public struct Callable<Request: Encodable, Response: Decodable> {
2024
/// The timeout to use when calling the function. Defaults to 70 seconds.
2125
public var timeoutInterval: TimeInterval {
@@ -160,3 +164,175 @@ public struct Callable<Request: Encodable, Response: Decodable> {
160164
return try await call(data)
161165
}
162166
}
167+
168+
/// Used to determine when a `StreamResponse<_, _>` is being decoded.
169+
private protocol StreamResponseProtocol {}
170+
171+
/// A convenience type used to receive both the streaming callable function's yielded messages and
172+
/// its return value.
173+
///
174+
/// This can be used as the generic `Response` parameter to ``Callable`` to receive both the
175+
/// yielded messages and final return value of the streaming callable function.
176+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
177+
public enum StreamResponse<Message: Decodable, Result: Decodable>: Decodable,
178+
StreamResponseProtocol {
179+
/// The message yielded by the callable function.
180+
case message(Message)
181+
/// The final result returned by the callable function.
182+
case result(Result)
183+
184+
private enum CodingKeys: String, CodingKey {
185+
case message
186+
case result
187+
}
188+
189+
public init(from decoder: any Decoder) throws {
190+
do {
191+
let container = try decoder
192+
.container(keyedBy: Self<Message, Result>.CodingKeys.self)
193+
guard let onlyKey = container.allKeys.first, container.allKeys.count == 1 else {
194+
throw DecodingError
195+
.typeMismatch(
196+
Self<Message,
197+
Result>.self,
198+
DecodingError.Context(
199+
codingPath: container.codingPath,
200+
debugDescription: "Invalid number of keys found, expected one.",
201+
underlyingError: nil
202+
)
203+
)
204+
}
205+
206+
switch onlyKey {
207+
case .message:
208+
self = try Self
209+
.message(container.decode(Message.self, forKey: .message))
210+
case .result:
211+
self = try Self
212+
.result(container.decode(Result.self, forKey: .result))
213+
}
214+
} catch {
215+
throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error])
216+
}
217+
}
218+
}
219+
220+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
221+
public extension Callable where Request: Sendable, Response: Sendable {
222+
/// Creates a stream that yields responses from the streaming callable function.
223+
///
224+
/// The request to the Cloud Functions backend made by this method automatically includes a FCM
225+
/// token to identify the app instance. If a user is logged in with Firebase Auth, an auth ID
226+
/// token for the user is included. If App Check is integrated, an app check token is included.
227+
///
228+
/// Firebase Cloud Messaging sends data to the Firebase backend periodically to collect
229+
/// information regarding the app instance. To stop this, see `Messaging.deleteData()`. It
230+
/// resumes with a new FCM Token the next time you call this method.
231+
///
232+
/// - Important: The final result returned by the callable function is only accessible when
233+
/// using `StreamResponse` as the `Response` generic type.
234+
///
235+
/// Example of using `stream` _without_ `StreamResponse`:
236+
/// ```swift
237+
/// let callable: Callable<MyRequest, MyResponse> = // ...
238+
/// let request: MyRequest = // ...
239+
/// let stream = try callable.stream(request)
240+
/// for try await response in stream {
241+
/// // Process each `MyResponse` message
242+
/// print(response)
243+
/// }
244+
/// ```
245+
///
246+
/// Example of using `stream` _with_ `StreamResponse`:
247+
/// ```swift
248+
/// let callable: Callable<MyRequest, StreamResponse<MyMessage, MyResult>> = // ...
249+
/// let request: MyRequest = // ...
250+
/// let stream = try callable.stream(request)
251+
/// for try await response in stream {
252+
/// switch response {
253+
/// case .message(let message):
254+
/// // Process each `MyMessage`
255+
/// print(message)
256+
/// case .result(let result):
257+
/// // Process the final `MyResult`
258+
/// print(result)
259+
/// }
260+
/// }
261+
/// ```
262+
///
263+
/// - Parameter data: The `Request` data to pass to the callable function.
264+
/// - Throws: A ``FunctionsError`` if the parameter `data` cannot be encoded.
265+
/// - Returns: A stream wrapping responses yielded by the streaming callable function or
266+
/// a ``FunctionsError`` if an error occurred.
267+
func stream(_ data: Request? = nil) throws -> AsyncThrowingStream<Response, Error> {
268+
let encoded: Any
269+
do {
270+
encoded = try encoder.encode(data)
271+
} catch {
272+
throw FunctionsError(.invalidArgument, userInfo: [NSUnderlyingErrorKey: error])
273+
}
274+
275+
return AsyncThrowingStream { continuation in
276+
Task {
277+
do {
278+
for try await response in callable.stream(encoded) {
279+
do {
280+
// This response JSON should only be able to be decoded to an `StreamResponse<_, _>`
281+
// instance. If the decoding succeeds and the decoded response conforms to
282+
// `StreamResponseProtocol`, we know the `Response` generic argument
283+
// is `StreamResponse<_, _>`.
284+
let responseJSON = switch response {
285+
case .message(let json), .result(let json): json
286+
}
287+
let response = try decoder.decode(Response.self, from: responseJSON)
288+
if response is StreamResponseProtocol {
289+
continuation.yield(response)
290+
} else {
291+
// `Response` is a custom type that matched the decoding logic as the
292+
// `StreamResponse<_, _>` type. Only the `StreamResponse<_, _>` type should decode
293+
// successfully here to avoid exposing the `result` value in a custom type.
294+
throw FunctionsError(.internal)
295+
}
296+
} catch let error as FunctionsError where error.code == .dataLoss {
297+
// `Response` is of type `StreamResponse<_, _>`, but failed to decode. Rethrow.
298+
throw error
299+
} catch {
300+
// `Response` is *not* of type `StreamResponse<_, _>`, and needs to be unboxed and
301+
// decoded.
302+
guard case let .message(messageJSON) = response else {
303+
// Since `Response` is not a `StreamResponse<_, _>`, only messages should be
304+
// decoded.
305+
continue
306+
}
307+
308+
do {
309+
let boxedMessage = try decoder.decode(
310+
StreamResponseMessage.self,
311+
from: messageJSON
312+
)
313+
continuation.yield(boxedMessage.message)
314+
} catch {
315+
throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error])
316+
}
317+
}
318+
}
319+
} catch {
320+
continuation.finish(throwing: error)
321+
}
322+
continuation.finish()
323+
}
324+
}
325+
}
326+
327+
/// A container type for the type-safe decoding of the message object from the generic `Response`
328+
/// type.
329+
private struct StreamResponseMessage: Decodable {
330+
let message: Response
331+
}
332+
}
333+
334+
/// A container type for differentiating between message and result responses.
335+
enum JSONStreamResponse {
336+
case message([String: Any])
337+
case result([String: Any])
338+
}

0 commit comments

Comments
 (0)