Skip to content

Commit 14a6a25

Browse files
committed
wip
1 parent 4ffa4c9 commit 14a6a25

File tree

6 files changed

+184
-144
lines changed

6 files changed

+184
-144
lines changed

packages/shared/common/src/internal/fdv2/payloadReader.ts

Lines changed: 66 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1-
import { EventListener, EventName } from '../../api';
1+
/* eslint-disable no-underscore-dangle */
2+
import { EventListener, EventName, LDLogger } from '../../api';
3+
import { DataSourceErrorKind } from '../../datasource';
24
import { DataObject, PayloadTransferred, ServerIntentData } from './proto';
35

46
// Facade interface to contain only ability to add event listeners
57
export interface EventStream {
68
addEventListener(type: EventName, listener: EventListener): void;
79
}
810

11+
export interface JsonObjConverters {
12+
[kind: string]: (object: any) => any;
13+
}
14+
915
export interface Update extends DataObject {
1016
deleted?: boolean;
1117
}
@@ -29,13 +35,16 @@ export class PayloadReader {
2935
tempBasis?: boolean = undefined;
3036
tempUpdates: Update[] = [];
3137

32-
constructor(eventSource: EventStream, listeners: PayloadListener[]) {
33-
this.listeners = listeners.concat(listeners);
34-
35-
eventSource.addEventListener('server-intent', this._processServerIntent);
36-
eventSource.addEventListener('put-object', this._processPutObject);
37-
eventSource.addEventListener('delete-object', this._processDeleteObject);
38-
eventSource.addEventListener('payload-transferred', this._processPayloadTransferred);
38+
constructor(
39+
eventSource: EventStream,
40+
private readonly _jsonObjConverters: JsonObjConverters,
41+
private readonly _errorHandler?: (errorKind: DataSourceErrorKind, message: string) => void,
42+
private readonly _logger?: LDLogger,
43+
) {
44+
this._attachHandler(eventSource, 'server-intent', this._processServerIntent);
45+
this._attachHandler(eventSource, 'put-object', this._processPutObject);
46+
this._attachHandler(eventSource, 'delete-object', this._processDeleteObject);
47+
this._attachHandler(eventSource, 'payload-transferred', this._processPayloadTransferred);
3948
}
4049

4150
addPayloadListener(listener: PayloadListener) {
@@ -49,6 +58,31 @@ export class PayloadReader {
4958
}
5059
}
5160

61+
private _attachHandler(stream: EventStream, eventName: string, processor: (obj: any) => void) {
62+
stream.addEventListener(eventName, async (event?: { data?: string }) => {
63+
if (event?.data) {
64+
this._logger?.debug(`Received ${eventName} event`);
65+
66+
try {
67+
processor(JSON.parse(event.data));
68+
} catch {
69+
this._logger?.error(`Stream received invalid data in "${eventName}" message`);
70+
this._logger?.debug(`Invalid JSON follows: ${event.data}`);
71+
this._errorHandler?.(
72+
DataSourceErrorKind.InvalidData,
73+
'Malformed JSON data in event stream',
74+
);
75+
}
76+
} else {
77+
this._errorHandler?.(DataSourceErrorKind.Unknown, 'Unexpected payload from event stream');
78+
}
79+
});
80+
}
81+
82+
private _convertJsonObj(jsonObj: any): any {
83+
return this._jsonObjConverters[jsonObj.kind]?.(jsonObj);
84+
}
85+
5286
// TODO: add valid state/reset handling if an invalid message is received part way through processing and to avoid starting prcessing put/deletes before server intent is received
5387
private _processServerIntent = (event?: { data?: ServerIntentData }) => {
5488
// clear state in prep for handling data
@@ -77,35 +111,46 @@ export class PayloadReader {
77111
this.tempId = payload?.id;
78112
};
79113

80-
private _processPutObject = (event?: { data?: DataObject }) => {
114+
private _processPutObject = (jsonObj: any) => {
81115
// if the following properties haven't been provided by now, we're in an invalid state
82-
if (!event?.data?.kind || !event.data.key || !event.data.version || !event.data.object) {
116+
if (!jsonObj.kind || !jsonObj.key || !jsonObj.version || !jsonObj.object) {
83117
this._resetState();
84118
return;
85119
}
86120

121+
const obj = this._convertJsonObj(jsonObj);
122+
if (!obj) {
123+
// ignore unrecognized kinds
124+
return;
125+
}
126+
87127
this.tempUpdates.push({
88-
kind: event.data.kind,
89-
key: event.data.key,
90-
object: event.data.object,
91-
version: event.data.version,
128+
kind: jsonObj.kind,
129+
key: jsonObj.key,
130+
version: jsonObj.version,
131+
object: obj,
92132
// intentionally omit deleted for this put
93133
});
94134
};
95135

96-
// TODO: consider merging put and delete and having param for delete logic
97-
private _processDeleteObject = (event?: { data?: DataObject }) => {
136+
private _processDeleteObject = (jsonObj: any) => {
98137
// if the following properties haven't been provided by now, we're in an invalid state
99-
if (!event?.data?.kind || !event.data.key || !event.data.version || !event.data.object) {
138+
if (!jsonObj.kind || !jsonObj.key || !jsonObj.version || !jsonObj.object) {
100139
this._resetState();
101140
return;
102141
}
103142

143+
const obj = this._convertJsonObj(jsonObj);
144+
if (!obj) {
145+
// ignore unrecognized kinds
146+
return;
147+
}
148+
104149
this.tempUpdates.push({
105-
kind: event.data.kind,
106-
key: event.data.key,
107-
object: event.data.object,
108-
version: event.data.version,
150+
kind: jsonObj.kind,
151+
key: jsonObj.key,
152+
version: jsonObj.version,
153+
object: obj,
109154
deleted: true,
110155
});
111156
};

packages/shared/sdk-server/src/LDClientImpl.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ import { BigSegmentStoreMembership } from './api/interfaces';
3232
import { LDWaitForInitializationOptions } from './api/LDWaitForInitializationOptions';
3333
import BigSegmentsManager from './BigSegmentsManager';
3434
import BigSegmentStoreStatusProvider from './BigSegmentStatusProviderImpl';
35-
import { createStreamListeners } from './data_sources/createStreamListeners';
35+
import { createPayloadListener } from './data_sources/createStreamListenersFDv2';
3636
import DataSourceUpdates from './data_sources/DataSourceUpdates';
3737
import PollingProcessor from './data_sources/PollingProcessor';
3838
import Requestor from './data_sources/Requestor';
39-
import StreamingProcessor from './data_sources/StreamingProcessor';
39+
import StreamingProcessorFDv2 from './data_sources/StreamingProcessorFDv2';
4040
import createDiagnosticsInitConfig from './diagnostics/createDiagnosticsInitConfig';
4141
import { allAsync } from './evaluation/collection';
4242
import { Flag } from './evaluation/data/Flag';
@@ -216,16 +216,17 @@ export default class LDClientImpl implements LDClient {
216216
};
217217
this._evaluator = new Evaluator(this._platform, queries);
218218

219-
const listeners = createStreamListeners(dataSourceUpdates, this._logger, {
220-
put: () => this._initSuccess(),
221-
});
219+
const payloadListener = createPayloadListener(dataSourceUpdates, this.logger, () =>
220+
this._initSuccess(),
221+
);
222+
222223
const makeDefaultProcessor = () =>
223224
config.stream
224-
? new StreamingProcessor(
225+
? new StreamingProcessorFDv2(
225226
clientContext,
226227
'/all',
227228
[],
228-
listeners,
229+
payloadListener,
229230
baseHeaders,
230231
this._diagnosticsManager,
231232
(e) => this._dataSourceErrorHandler(e),

packages/shared/sdk-server/src/data_sources/StreamingProcessor.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ export default class StreamingProcessor implements subsystem.LDStreamProcessor {
117117
});
118118
this._eventSource = eventSource;
119119

120-
this._payloadReader = new internal.PayloadReader(eventSource, createPay);
121-
122120
eventSource.onclose = () => {
123121
this._logger?.info('Closed LaunchDarkly stream connection');
124122
};
@@ -139,14 +137,12 @@ export default class StreamingProcessor implements subsystem.LDStreamProcessor {
139137
eventSource.addEventListener(eventName, (event) => {
140138
this._logger?.debug(`Received ${eventName} event`);
141139

142-
// TODO: this deserialization logic has to live somewhere. I'm missing it right now.
143140
if (event?.data) {
144141
this._logConnectionResult(true);
145142
const { data } = event;
146143
const dataJson = deserializeData(data);
147144

148145
if (!dataJson) {
149-
// TODO: figure out JSON error handling
150146
reportJsonError(eventName, data, this._logger, this._errorHandler);
151147
return;
152148
}

packages/shared/sdk-server/src/data_sources/StreamingProcessorFDv2.ts

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import {
22
ClientContext,
33
DataSourceErrorKind,
4-
EventName,
54
EventSource,
65
getStreamingUri,
76
httpErrorMessage,
@@ -10,28 +9,17 @@ import {
109
LDHeaders,
1110
LDLogger,
1211
LDStreamingError,
13-
ProcessStreamResponse,
1412
Requests,
1513
shouldRetry,
1614
StreamingErrorHandler,
1715
subsystem,
1816
} from '@launchdarkly/js-sdk-common';
17+
import { PayloadListener } from '@launchdarkly/js-sdk-common/dist/esm/internal';
1918

20-
const reportJsonError = (
21-
type: string,
22-
data: string,
23-
logger?: LDLogger,
24-
errorHandler?: StreamingErrorHandler,
25-
) => {
26-
logger?.error(`Stream received invalid data in "${type}" message`);
27-
logger?.debug(`Invalid JSON follows: ${data}`);
28-
errorHandler?.(
29-
new LDStreamingError(DataSourceErrorKind.InvalidData, 'Malformed JSON data in event stream'),
30-
);
31-
};
19+
import { processFlag, processSegment } from '../store/serialization';
3220

3321
// TODO: consider naming this StreamingDatasource
34-
export default class StreamingProcessor implements subsystem.LDStreamProcessor {
22+
export default class StreamingProcessorFDv2 implements subsystem.LDStreamProcessor {
3523
private readonly _headers: { [key: string]: string | string[] };
3624
private readonly _streamUri: string;
3725
private readonly _logger?: LDLogger;
@@ -44,7 +32,7 @@ export default class StreamingProcessor implements subsystem.LDStreamProcessor {
4432
clientContext: ClientContext,
4533
streamUriPath: string,
4634
parameters: { key: string; value: string }[],
47-
private readonly _listeners: Map<EventName, ProcessStreamResponse>,
35+
private readonly _payloadListener: PayloadListener,
4836
baseHeaders: LDHeaders,
4937
private readonly _diagnosticsManager?: internal.DiagnosticsManager,
5038
private readonly _errorHandler?: StreamingErrorHandler,
@@ -117,6 +105,22 @@ export default class StreamingProcessor implements subsystem.LDStreamProcessor {
117105
retryResetIntervalMillis: 60 * 1000,
118106
});
119107
this._eventSource = eventSource;
108+
const payloadReader = new internal.PayloadReader(
109+
eventSource,
110+
{
111+
flag: processFlag,
112+
segment: processSegment,
113+
},
114+
(errorKind: DataSourceErrorKind, message: string) => {
115+
this._errorHandler?.(new LDStreamingError(errorKind, message));
116+
},
117+
);
118+
payloadReader.addPayloadListener(() => {
119+
// TODO: discuss if it is satisfactory to switch from setting connection result on single event to getting a payload. Need
120+
// to double check the handling in the ServerIntent:none case. That may not trigger these payload listeners.
121+
this._logConnectionResult(true);
122+
});
123+
payloadReader.addPayloadListener(this._payloadListener);
120124

121125
eventSource.onclose = () => {
122126
this._logger?.info('Closed LaunchDarkly stream connection');
@@ -133,31 +137,6 @@ export default class StreamingProcessor implements subsystem.LDStreamProcessor {
133137
eventSource.onretrying = (e) => {
134138
this._logger?.info(`Will retry stream connection in ${e.delayMillis} milliseconds`);
135139
};
136-
137-
this._listeners.forEach(({ deserializeData, processJson }, eventName) => {
138-
eventSource.addEventListener(eventName, (event) => {
139-
this._logger?.debug(`Received ${eventName} event`);
140-
141-
if (event?.data) {
142-
this._logConnectionResult(true);
143-
const { data } = event;
144-
const dataJson = deserializeData(data);
145-
146-
if (!dataJson) {
147-
reportJsonError(eventName, data, this._logger, this._errorHandler);
148-
return;
149-
}
150-
processJson(dataJson);
151-
} else {
152-
this._errorHandler?.(
153-
new LDStreamingError(
154-
DataSourceErrorKind.Unknown,
155-
'Unexpected payload from event stream',
156-
),
157-
);
158-
}
159-
});
160-
});
161140
}
162141

163142
stop() {

0 commit comments

Comments
 (0)