Skip to content

Commit 39c522f

Browse files
committed
checkpoint before connecting to callbacks
1 parent b7e0248 commit 39c522f

File tree

3 files changed

+81
-23
lines changed

3 files changed

+81
-23
lines changed

common/api-review/data-connect.api.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ export interface DataConnectOptions extends ConnectorConfig, DataConnectSettings
129129
}
130130

131131
// @public (undocumented)
132-
export interface DataConnectResponse<T> {
132+
export interface DataConnectResponse<Data> {
133133
// (undocumented)
134-
data: T;
134+
data: Data;
135135
// (undocumented)
136136
errors: Error[];
137137
// (undocumented)
@@ -156,6 +156,20 @@ export interface DataConnectSingleEntity {
156156
entityId: string;
157157
}
158158

159+
// @public (undocumented)
160+
export interface DataConnectStreamResponse<Data> {
161+
// (undocumented)
162+
cancelled: boolean;
163+
// (undocumented)
164+
data: Data;
165+
// (undocumented)
166+
dataEtag: string;
167+
// (undocumented)
168+
errors: Error[];
169+
// (undocumented)
170+
requestId: string;
171+
}
172+
159173
// @public
160174
export interface DataConnectSubscription<Data, Variables> {
161175
// (undocumented)
@@ -366,6 +380,8 @@ export interface TransportOptions {
366380
port?: number;
367381
// (undocumented)
368382
sslEnabled?: boolean;
383+
// (undocumented)
384+
streamEnabled?: boolean;
369385
}
370386

371387

packages/data-connect/src/network/transport/index.ts

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,18 @@ export interface DataConnectExtensions {
5858
dataConnect?: DataConnectExtension[];
5959
}
6060

61-
export interface DataConnectResponse<T> {
62-
data: T;
61+
export interface DataConnectResponse<Data> {
62+
data: Data;
6363
errors: Error[];
6464
extensions: DataConnectExtensions;
6565
}
6666

67-
/**
68-
* Represents a single stream of communication over a physical connection.
69-
* Example: A single operations execution, or a query subscription
70-
* @internal
71-
*/
72-
export interface LogicalStream<Data, Variables> {
67+
export interface DataConnectStreamResponse<Data> {
7368
requestId: string;
74-
operationName: string;
75-
variables?: Variables;
76-
lastResponse?: DataConnectResponse<Data>;
69+
data: Data;
70+
dataEtag: string; // TODO: actually a hash
71+
errors: Error[];
72+
cancelled: boolean;
7773
}
7874

7975
/**

packages/data-connect/src/network/transport/stream.ts

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@
1717

1818
import { DataConnectOptions, TransportOptions } from '../../api/DataConnect';
1919
import { AppCheckTokenProvider } from '../../core/AppCheckTokenProvider';
20-
import { Code, DataConnectError } from '../../core/error';
20+
import {
21+
Code,
22+
DataConnectError,
23+
DataConnectOperationError,
24+
DataConnectOperationFailureResponse,
25+
DataConnectOperationFailureResponseErrorInfo
26+
} from '../../core/error';
2127
import { AuthTokenProvider } from '../../core/FirebaseAuthProvider';
2228
import { encoderImpl } from '../../util/encoder';
2329

@@ -29,7 +35,8 @@ import {
2935
DataConnectTransportClass,
3036
ExecuteStreamRequest,
3137
SubscribeStreamRequest,
32-
CancelStreamRequest
38+
CancelStreamRequest,
39+
DataConnectStreamResponse
3340
} from '.';
3441

3542
/**
@@ -195,10 +202,11 @@ export class StreamTransport extends DataConnectTransportClass {
195202
variables: Variables
196203
): string {
197204
// TODO: should this be simpler? maybe it should be similar to the execution request ID...? the only reason we need a unique AND identifying ID is so the SDK can lookup for an existing request when we want to unsubscribe, so maybe let's keep the complexity on the SDK side only
198-
return encoderImpl({
205+
const queryKey = encoderImpl({
199206
name,
200207
variables
201208
});
209+
return `subscribe-${queryKey}`;
202210
}
203211

204212
/**
@@ -259,17 +267,53 @@ export class StreamTransport extends DataConnectTransportClass {
259267
try {
260268
// eslint-disable-next-line no-console
261269
console.log('MESSAGE RECEIVED:', ev.data); // DEBUGGING
262-
const msg = JSON.parse(ev.data);
263-
const requestId = msg.requestId; // TODO: figure out what type of message this is based on the request ID
264-
if (requestId && this._executeRequests.has(requestId)) {
270+
const response = this._parseStreamResponse(ev.data);
271+
const requestId = response.requestId;
272+
273+
if (response.errors) {
274+
const stringified = JSON.stringify(response.errors);
275+
const failureResponse: DataConnectOperationFailureResponse = {
276+
errors:
277+
response.errors as unknown as DataConnectOperationFailureResponseErrorInfo[], // TODO: type this properly, or return a different type of error
278+
data: response.data as Record<string, unknown>
279+
};
280+
throw new DataConnectOperationError(
281+
'DataConnect error while performing request: ' + stringified,
282+
failureResponse
283+
);
284+
}
285+
286+
if (this._executeRequests.has(requestId)) {
265287
// eslint-disable-next-line @typescript-eslint/no-unused-vars
266-
const { resolve, reject } = this._executeRequests.get(requestId)!; // TODO: might not exist... remove "!"
267-
resolve(msg); // TODO: do something with the message other than just pass it along, depending on message type - we should be resolving to DataConnectResponse<Data> if it succeeds
268-
this._executeRequests.delete(requestId); // TODO: not necessarily an execute. only delete if this an execute
288+
const { resolve, reject } = this._executeRequests.get(requestId)!;
289+
resolve(response.data); // TODO: do something with the message other than just pass it along, depending on message type - we should be resolving to DataConnectResponse<Data> if it succeeds
290+
this._executeRequests.delete(requestId);
291+
}
292+
if (this._subscribeRequests.has(requestId)) {
293+
// TODO: call callbacks
269294
}
270295
} catch (e) {
271-
console.error('Error handling WebSocket message', e);
296+
throw new DataConnectError(Code.OTHER, 'error receiving message');
297+
}
298+
}
299+
300+
/**
301+
* Parse a response from the server. Assert that it has a requestId.
302+
* @param msg the message from the server to be parsed
303+
* @returns the parsed message as a DataConnectStreamResponse
304+
*/
305+
private _parseStreamResponse<Data>(
306+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
307+
msg: any
308+
): DataConnectStreamResponse<Data> {
309+
const response = JSON.parse(msg.data);
310+
if (!response.requestId) {
311+
throw new DataConnectError(
312+
Code.OTHER,
313+
'message from stream did not include requestId'
314+
);
272315
}
316+
return response as DataConnectStreamResponse<Data>;
273317
}
274318

275319
/**
@@ -308,6 +352,7 @@ export class StreamTransport extends DataConnectTransportClass {
308352
variables?: Variables
309353
): Promise<DataConnectResponse<Data>> {
310354
const requestId = this._makeExecuteRequestId();
355+
// TODO: "To save bandwidth, the Data Connect SDK should include data_etag of cached data in subsequent requests, so the backend can avoid sending redundant data already in SDK cache.
311356
const body: ExecuteStreamRequest<Variables> = {
312357
'name': this.connectorResourcePath,
313358
requestId,
@@ -329,6 +374,7 @@ export class StreamTransport extends DataConnectTransportClass {
329374
*/
330375
invokeSubscription<Variables>(queryName: string, variables: Variables): void {
331376
const requestId = this._makeSubscribeRequestId(queryName, variables);
377+
// TODO: "To save bandwidth, the Data Connect SDK should include data_etag of cached data in subsequent requests, so the backend can avoid sending redundant data already in SDK cache.
332378
const body: SubscribeStreamRequest<Variables> = {
333379
'name': this.connectorResourcePath,
334380
requestId,

0 commit comments

Comments
 (0)