Skip to content

Commit a0f5398

Browse files
Todd AndersonTodd Anderson
authored andcommitted
feat: adds polling synchronizer support
1 parent 6904c99 commit a0f5398

File tree

11 files changed

+362
-120
lines changed

11 files changed

+362
-120
lines changed

contract-tests/testharness-suppressions.txt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
polling/requests
2-
polling/payload/large payloads
3-
41
streaming/validation/drop and reconnect if stream event has malformed JSON
52
streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema
63

7-
wrapper/poll requests/wrapper name and version
8-
94
streaming/fdv2/reconnection state management/initializes from polling initializer
105
streaming/fdv2/reconnection state management/initializes from 2 polling initializers
116

packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts renamed to packages/shared/common/__tests__/internal/fdv2/PayloadStreamReader.test.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { EventListener, EventName, LDLogger } from '../../../src/api';
2-
import { EventStream, Payload, PayloadReader } from '../../../src/internal/fdv2/payloadReader';
2+
import { Payload } from '../../../src/internal/fdv2/payloadProcessor';
3+
import { EventStream, PayloadStreamReader } from '../../../src/internal/fdv2/payloadStreamReader';
4+
35

46
class MockEventStream implements EventStream {
57
private _listeners: Record<EventName, EventListener> = {};
@@ -16,7 +18,7 @@ class MockEventStream implements EventStream {
1618
it('it sets basis to true when intent code is xfer-full', () => {
1719
const mockStream = new MockEventStream();
1820
const receivedPayloads: Payload[] = [];
19-
const readerUnderTest = new PayloadReader(mockStream, {
21+
const readerUnderTest = new PayloadStreamReader(mockStream, {
2022
mockKind: (it) => it, // obj processor that just returns the same obj
2123
});
2224
readerUnderTest.addPayloadListener((it) => {
@@ -38,7 +40,7 @@ it('it sets basis to true when intent code is xfer-full', () => {
3840
it('it sets basis to false when intent code is xfer-changes', () => {
3941
const mockStream = new MockEventStream();
4042
const receivedPayloads: Payload[] = [];
41-
const readerUnderTest = new PayloadReader(mockStream, {
43+
const readerUnderTest = new PayloadStreamReader(mockStream, {
4244
mockKind: (it) => it, // obj processor that just returns the same obj
4345
});
4446
readerUnderTest.addPayloadListener((it) => {
@@ -60,7 +62,7 @@ it('it sets basis to false when intent code is xfer-changes', () => {
6062
it('it handles xfer-full then xfer-changes', () => {
6163
const mockStream = new MockEventStream();
6264
const receivedPayloads: Payload[] = [];
63-
const readerUnderTest = new PayloadReader(mockStream, {
65+
const readerUnderTest = new PayloadStreamReader(mockStream, {
6466
mockKind: (it) => it, // obj processor that just returns the same obj
6567
});
6668
readerUnderTest.addPayloadListener((it) => {
@@ -102,7 +104,7 @@ it('it handles xfer-full then xfer-changes', () => {
102104
it('it includes multiple types of updates in payload', () => {
103105
const mockStream = new MockEventStream();
104106
const receivedPayloads: Payload[] = [];
105-
const readerUnderTest = new PayloadReader(mockStream, {
107+
const readerUnderTest = new PayloadStreamReader(mockStream, {
106108
mockKind: (it) => it, // obj processor that just returns the same obj
107109
});
108110
readerUnderTest.addPayloadListener((it) => {
@@ -140,7 +142,7 @@ it('it includes multiple types of updates in payload', () => {
140142
it('it does not include messages thats are not between server-intent and payloader-transferred', () => {
141143
const mockStream = new MockEventStream();
142144
const receivedPayloads: Payload[] = [];
143-
const readerUnderTest = new PayloadReader(mockStream, {
145+
const readerUnderTest = new PayloadStreamReader(mockStream, {
144146
mockKind: (it) => it, // obj processor that just returns the same obj
145147
});
146148
readerUnderTest.addPayloadListener((it) => {
@@ -173,7 +175,7 @@ it('logs prescribed message when goodbye event is encountered', () => {
173175
};
174176
const mockStream = new MockEventStream();
175177
const receivedPayloads: Payload[] = [];
176-
const readerUnderTest = new PayloadReader(
178+
const readerUnderTest = new PayloadStreamReader(
177179
mockStream,
178180
{
179181
mockKind: (it) => it, // obj processor that just returns the same obj
@@ -204,7 +206,7 @@ it('logs prescribed message when error event is encountered', () => {
204206
};
205207
const mockStream = new MockEventStream();
206208
const receivedPayloads: Payload[] = [];
207-
const readerUnderTest = new PayloadReader(
209+
const readerUnderTest = new PayloadStreamReader(
208210
mockStream,
209211
{
210212
mockKind: (it) => it, // obj processor that just returns the same obj
@@ -246,7 +248,7 @@ it('discards partially transferred data when an error is encountered', () => {
246248
};
247249
const mockStream = new MockEventStream();
248250
const receivedPayloads: Payload[] = [];
249-
const readerUnderTest = new PayloadReader(
251+
const readerUnderTest = new PayloadStreamReader(
250252
mockStream,
251253
{
252254
mockKind: (it) => it, // obj processor that just returns the same obj
@@ -310,7 +312,7 @@ it('discards partially transferred data when an error is encountered', () => {
310312
it('silently ignores unrecognized kinds', () => {
311313
const mockStream = new MockEventStream();
312314
const receivedPayloads: Payload[] = [];
313-
const readerUnderTest = new PayloadReader(mockStream, {
315+
const readerUnderTest = new PayloadStreamReader(mockStream, {
314316
mockKind: (it) => it, // obj processor that just returns the same obj
315317
});
316318
readerUnderTest.addPayloadListener((it) => {
@@ -340,7 +342,7 @@ it('silently ignores unrecognized kinds', () => {
340342
it('ignores additional payloads beyond the first payload in the server-intent message', () => {
341343
const mockStream = new MockEventStream();
342344
const receivedPayloads: Payload[] = [];
343-
const readerUnderTest = new PayloadReader(mockStream, {
345+
const readerUnderTest = new PayloadStreamReader(mockStream, {
344346
mockKind: (it) => it, // obj processor that just returns the same obj
345347
});
346348
readerUnderTest.addPayloadListener((it) => {
Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1-
import { Payload, PayloadListener, PayloadReader, Update } from './payloadReader';
1+
import {
2+
EventsSummary,
3+
Payload,
4+
PayloadListener,
5+
PayloadProcessor,
6+
Update,
7+
} from './payloadProcessor';
8+
import { PayloadStreamReader } from './payloadStreamReader';
29

3-
export { Payload, PayloadListener, PayloadReader, Update };
10+
export { EventsSummary, Payload, PayloadListener, PayloadProcessor, PayloadStreamReader, Update };

packages/shared/common/src/internal/fdv2/payloadReader.ts renamed to packages/shared/common/src/internal/fdv2/payloadProcessor.ts

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
/* eslint-disable no-underscore-dangle */
2-
import { EventListener, EventName, LDLogger } from '../../api';
2+
import { LDLogger } from '../../api';
33
import { DataSourceErrorKind } from '../../datasource';
44
import { DeleteObject, PayloadTransferred, PutObject, ServerIntentData } from './proto';
55

6-
// Facade interface to contain only ability to add event listeners
7-
export interface EventStream {
8-
addEventListener(type: EventName, listener: EventListener): void;
9-
}
10-
116
// Used to define object processing between deserialization and payload listener invocation. This can be
127
// used provide object sanitization logic.
138
export interface ObjProcessors {
149
[kind: string]: (object: any) => any;
1510
}
1611

12+
export interface EventsSummary {
13+
events: Event[];
14+
}
15+
16+
export interface Event {
17+
event: string;
18+
data: any;
19+
}
20+
1721
// Represents information for one keyed object.
1822
export interface Update {
1923
kind: string;
@@ -40,7 +44,7 @@ export type PayloadListener = (payload: Payload) => void;
4044
* to the PayloadListeners as the payloads are received. Invalid series of events may be dropped silently,
4145
* but the payload reader will continue to operate.
4246
*/
43-
export class PayloadReader {
47+
export class PayloadProcessor {
4448
private _listeners: PayloadListener[] = [];
4549

4650
private _tempId?: string = undefined;
@@ -50,24 +54,15 @@ export class PayloadReader {
5054
/**
5155
* Creates a PayloadReader
5256
*
53-
* @param eventStream event stream of FDv2 events
5457
* @param _objProcessors defines object processors for each object kind.
55-
* @param _errorHandler that will be called with errors as they are encountered
58+
* @param _errorHandler that will be called with parsing errors as they are encountered
5659
* @param _logger for logging
5760
*/
5861
constructor(
59-
eventStream: EventStream,
6062
private readonly _objProcessors: ObjProcessors,
6163
private readonly _errorHandler?: (errorKind: DataSourceErrorKind, message: string) => void,
6264
private readonly _logger?: LDLogger,
63-
) {
64-
this._attachHandler(eventStream, 'server-intent', this._processServerIntent);
65-
this._attachHandler(eventStream, 'put-object', this._processPutObject);
66-
this._attachHandler(eventStream, 'delete-object', this._processDeleteObject);
67-
this._attachHandler(eventStream, 'payload-transferred', this._processPayloadTransferred);
68-
this._attachHandler(eventStream, 'goodbye', this._processGoodbye);
69-
this._attachHandler(eventStream, 'error', this._processError);
70-
}
65+
) {}
7166

7267
addPayloadListener(listener: PayloadListener) {
7368
this._listeners.push(listener);
@@ -80,21 +75,41 @@ export class PayloadReader {
8075
}
8176
}
8277

83-
private _attachHandler(stream: EventStream, eventName: string, processor: (obj: any) => void) {
84-
stream.addEventListener(eventName, async (event?: { data?: string }) => {
85-
if (event?.data) {
86-
this._logger?.debug(`Received ${eventName} event. Data is ${event.data}`);
87-
try {
88-
processor(JSON.parse(event.data));
89-
} catch {
90-
this._logger?.error(
91-
`Stream received data that was unable to be processed in "${eventName}" message`,
92-
);
93-
this._logger?.debug(`Data follows: ${event.data}`);
94-
this._errorHandler?.(DataSourceErrorKind.InvalidData, 'Malformed data in event stream');
78+
/**
79+
* Gives the {@link PayloadProcessor} a series of events that it will statefully, incrementally processed.
80+
* This may lead to listeners being invoked as necessary.
81+
* @param events to be processed (can be a single element)
82+
*/
83+
processEvents(events: Event[]) {
84+
events.forEach((event) => {
85+
switch (event.event) {
86+
case 'server-intent': {
87+
this._processServerIntent(event.data);
88+
break;
89+
}
90+
case 'put-object': {
91+
this._processPutObject(event.data);
92+
break;
93+
}
94+
case 'delete-object': {
95+
this._processDeleteObject(event.data);
96+
break;
97+
}
98+
case 'payload-transferred': {
99+
this._processPayloadTransferred(event.data);
100+
break;
101+
}
102+
case 'goodbye': {
103+
this._processGoodbye(event.data);
104+
break;
105+
}
106+
case 'error': {
107+
this._processError(event.data);
108+
break;
109+
}
110+
default: {
111+
// no-op, unrecognized
95112
}
96-
} else {
97-
this._errorHandler?.(DataSourceErrorKind.Unknown, 'Unexpected message from event stream');
98113
}
99114
});
100115
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/* eslint-disable no-underscore-dangle */
2+
import { EventListener, EventName, LDLogger } from '../../api';
3+
import { DataSourceErrorKind } from '../../datasource';
4+
import { ObjProcessors, PayloadListener, PayloadProcessor } from './payloadProcessor';
5+
6+
// Facade interface to contain only ability to add event listeners
7+
export interface EventStream {
8+
addEventListener(type: EventName, listener: EventListener): void;
9+
}
10+
11+
/**
12+
* A FDv2 PayloadStreamReader can be used to parse payloads from a stream of FDv2 events. See {@link PayloadProcessor}
13+
* for more details.
14+
*/
15+
export class PayloadStreamReader {
16+
private _payloadProcessor: PayloadProcessor;
17+
18+
/**
19+
* Creates a PayloadReader
20+
*
21+
* @param eventStream event stream of FDv2 events
22+
* @param _objProcessors defines object processors for each object kind.
23+
* @param _errorHandler that will be called with parsing errors as they are encountered
24+
* @param _logger for logging
25+
*/
26+
constructor(
27+
eventStream: EventStream,
28+
_objProcessors: ObjProcessors,
29+
private readonly _errorHandler?: (errorKind: DataSourceErrorKind, message: string) => void,
30+
private readonly _logger?: LDLogger,
31+
) {
32+
this._attachHandler(eventStream, 'server-intent');
33+
this._attachHandler(eventStream, 'put-object');
34+
this._attachHandler(eventStream, 'delete-object');
35+
this._attachHandler(eventStream, 'payload-transferred');
36+
this._attachHandler(eventStream, 'goodbye');
37+
this._attachHandler(eventStream, 'error');
38+
this._payloadProcessor = new PayloadProcessor(_objProcessors, _errorHandler, _logger);
39+
}
40+
41+
addPayloadListener(listener: PayloadListener) {
42+
this._payloadProcessor.addPayloadListener(listener);
43+
}
44+
45+
removePayloadListener(listener: PayloadListener) {
46+
this._payloadProcessor.removePayloadListener(listener);
47+
}
48+
49+
private _attachHandler(stream: EventStream, eventName: string) {
50+
stream.addEventListener(eventName, async (event?: { data?: string }) => {
51+
if (event?.data) {
52+
this._logger?.debug(`Received ${eventName} event. Data is ${event.data}`);
53+
try {
54+
this._payloadProcessor.processEvents([
55+
{ event: eventName, data: JSON.parse(event.data) },
56+
]);
57+
} catch {
58+
this._logger?.error(
59+
`Stream received data that was unable to be processed in "${eventName}" message`,
60+
);
61+
this._logger?.debug(`Data follows: ${event.data}`);
62+
this._errorHandler?.(DataSourceErrorKind.InvalidData, 'Malformed data in EventStream.');
63+
}
64+
} else {
65+
this._errorHandler?.(DataSourceErrorKind.Unknown, 'Event from EventStream missing data.');
66+
}
67+
});
68+
}
69+
}

0 commit comments

Comments
 (0)