Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion .github/workflows/server-node.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,19 @@ jobs:
run: yarn contract-test-service-build
- name: Launch the test service in the background
run: yarn contract-test-service 2>&1 &
- name: Clone and run contract tests from feat/fdv2 branch
run: |
mkdir -p /tmp/sdk-test-harness
git clone https://github.com/launchdarkly/sdk-test-harness.git /tmp/sdk-test-harness
cp ./contract-tests/testharness-suppressions-fdv2.txt /tmp/sdk-test-harness/testharness-suppressions-fdv2.txt
cd /tmp/sdk-test-harness
git checkout feat/fdv2
go build -o test-harness .
./test-harness -url http://localhost:8000 -debug --skip-from=testharness-suppressions-fdv2.txt
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- uses: launchdarkly/gh-actions/actions/[email protected]
with:
test_service_port: 8000
token: ${{ secrets.GITHUB_TOKEN }}
extra_params: '--skip-from=./contract-tests/testharness-suppressions.txt'
extra_params: '--skip-from=./contract-tests/testharness-suppressions.txt -stop-service-at-end'
2 changes: 1 addition & 1 deletion contract-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"license": "Apache-2.0",
"private": true,
"dependencies": {
"@launchdarkly/node-server-sdk": "9.8.0",
"@launchdarkly/node-server-sdk": "*",
"body-parser": "^1.19.0",
"express": "^4.17.1",
"got": "14.4.7"
Expand Down
95 changes: 91 additions & 4 deletions contract-tests/src/sdkClientEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import got from 'got';

import ld, {
createMigration,
DataSourceOptions,
LDClient,
LDConcurrentExecution,
LDContext,
Expand Down Expand Up @@ -33,6 +34,11 @@ interface SdkConfigOptions {
pollIntervalMs: number;
filter?: string;
};
dataSystem?: {
initializers?: SDKDataSystemInitializerParams[];
synchronizers?: SDKDataSystemSynchronizerParams;
payloadFilter?: string;
};
events?: {
allAttributesPrivate?: boolean;
baseUri: string;
Expand Down Expand Up @@ -67,6 +73,31 @@ interface SdkConfigOptions {
};
}

export interface SDKDataSystemSynchronizerParams {
primary?: {
streaming?: SDKDataSourceStreamingParams;
polling?: SDKDataSourcePollingParams;
};
secondary?: {
streaming?: SDKDataSourceStreamingParams;
polling?: SDKDataSourcePollingParams;
};
}

export interface SDKDataSystemInitializerParams {
polling?: SDKDataSourcePollingParams;
}

export interface SDKDataSourceStreamingParams {
baseUri?: string;
initialRetryDelayMs?: number;
}

export interface SDKDataSourcePollingParams {
baseUri?: string;
pollIntervalMs?: number;
}

interface CommandParams {
command: string;
evaluate?: {
Expand Down Expand Up @@ -128,8 +159,7 @@ export function makeSdkConfig(options: SdkConfigOptions, tag: string): LDOptions
cf.streamUri = options.streaming.baseUri;
cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs);
if (options.streaming.filter) {
cf.application = cf.application || {};
cf.application.payloadFilterKey = options.streaming.filter;
cf.payloadFilterKey = options.streaming.filter;
}
}

Expand All @@ -138,8 +168,7 @@ export function makeSdkConfig(options: SdkConfigOptions, tag: string): LDOptions
cf.baseUri = options.polling.baseUri;
cf.pollInterval = options.polling.pollIntervalMs / 1000;
if (options.polling.filter) {
cf.application = cf.application || {};
cf.application.payloadFilterKey = options.polling.filter;
cf.payloadFilterKey = options.polling.filter;
}
}

Expand Down Expand Up @@ -192,6 +221,64 @@ export function makeSdkConfig(options: SdkConfigOptions, tag: string): LDOptions
}
}

if (options.dataSystem) {
const dataSourceStreamingOptions: SDKDataSourceStreamingParams | undefined =
options.dataSystem.synchronizers?.primary?.streaming ??
options.dataSystem.synchronizers?.secondary?.streaming;
const dataSourcePollingOptions: SDKDataSourcePollingParams | undefined =
options.dataSystem.initializers?.[0]?.polling ??
options.dataSystem.synchronizers?.primary?.polling ??
options.dataSystem.synchronizers?.secondary?.polling;

if (dataSourceStreamingOptions) {
cf.streamUri = dataSourceStreamingOptions.baseUri;
cf.streamInitialReconnectDelay = maybeTime(dataSourceStreamingOptions.initialRetryDelayMs);
}
if (dataSourcePollingOptions) {
cf.stream = false;
cf.baseUri = dataSourcePollingOptions.baseUri;
cf.pollInterval = maybeTime(dataSourcePollingOptions.pollIntervalMs);
}

let dataSourceOptions: DataSourceOptions | undefined;
if (dataSourceStreamingOptions && dataSourcePollingOptions) {
dataSourceOptions = {
dataSourceOptionsType: 'standard',
...(dataSourceStreamingOptions.initialRetryDelayMs != null && {
streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs),
}),
...(dataSourcePollingOptions.pollIntervalMs != null && {
pollInterval: dataSourcePollingOptions.pollIntervalMs,
}),
};
} else if (dataSourceStreamingOptions) {
dataSourceOptions = {
dataSourceOptionsType: 'streamingOnly',
...(dataSourceStreamingOptions.initialRetryDelayMs != null && {
streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs),
}),
};
} else if (dataSourcePollingOptions) {
dataSourceOptions = {
dataSourceOptionsType: 'pollingOnly',
...(dataSourcePollingOptions.pollIntervalMs != null && {
pollInterval: dataSourcePollingOptions.pollIntervalMs,
}),
};
} else {
// No data source options were specified
dataSourceOptions = undefined;
}

if (options.dataSystem.payloadFilter) {
cf.payloadFilterKey = options.dataSystem.payloadFilter;
}

cf.dataSystem = {
dataSource: dataSourceOptions,
};
}

return cf;
}

Expand Down
14 changes: 14 additions & 0 deletions contract-tests/testharness-suppressions-fdv2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
streaming/validation/drop and reconnect if stream event has malformed JSON
streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema
streaming/requests/URL path is computed correctly/environment_filter_key="encoding_not_necessary"/base URI has no trailing slash/GET
streaming/requests/URL path is computed correctly/environment_filter_key="encoding_not_necessary"/base URI has a trailing slash/GET
polling/requests/URL path is computed correctly/environment_filter_key="encoding_not_necessary"/base URI has no trailing slash/GET
polling/requests/URL path is computed correctly/environment_filter_key="encoding_not_necessary"/base URI has a trailing slash/GET

streaming/fdv2/reconnection state management/initializes from polling initializer
streaming/fdv2/reconnection state management/initializes from 2 polling initializers
streaming/fdv2/reconnection state management/saves previously known state
streaming/fdv2/reconnection state management/replaces previously known state
streaming/fdv2/reconnection state management/updates previously known state
streaming/fdv2/ignores model version
streaming/fdv2/can discard partial events on errors
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ export class CompositeDataSource implements DataSource {
currentDS?.stop();

if (transitionRequest.err && transitionRequest.transition !== 'stop') {
// if the transition was due to an error, throttle the transition
const delay = this._backoff.fail();
// if the transition was due to an error we're not in the initializer phase, throttle the transition. Fallback between initializers is not throttled.
const delay = this._initPhaseActive ? 0 : this._backoff.fail();
const { promise, cancel: cancelDelay } = this._cancellableDelay(delay);
this._cancelTokens.push(cancelDelay);
const delayedTransition = promise.then(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,21 @@ describe('given a one shot initializer', () => {
requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData));
initializer.start(mockDataCallback, mockStatusCallback);
expect(mockDataCallback).toHaveBeenNthCalledWith(1, true, {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: Realized env id wasn't being piped through in the one shot initializer, so fixed that.

basis: true,
id: `mockId`,
state: `mockState`,
updates: [
{
kind: `flag`,
key: `flagA`,
version: 123,
object: { objectFieldA: 'objectValueA' },
},
],
version: 1,
initMetadata: undefined,
payload: {
basis: true,
id: `mockId`,
state: `mockState`,
updates: [
{
kind: `flag`,
key: `flagA`,
version: 123,
object: { objectFieldA: 'objectValueA' },
},
],
version: 1,
},
});
});
});
20 changes: 11 additions & 9 deletions packages/shared/sdk-server/src/LDClientImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,16 @@ function constructFDv2(
// make the FDv2 composite datasource with initializers/synchronizers
const initializers: subsystem.LDDataSourceFactory[] = [];

// use one shot initializer for performance and cost
initializers.push(
() =>
new OneShotInitializerFDv2(
new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger),
config.logger,
),
);
// use one shot initializer for performance and cost if we can do a combination of polling and streaming
if (isStandardOptions(dataSystem.dataSource)) {
initializers.push(
() =>
new OneShotInitializerFDv2(
new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger),
config.logger,
),
);
}

const synchronizers: subsystem.LDDataSourceFactory[] = [];
// if streaming is configured, add streaming synchronizer
Expand Down Expand Up @@ -368,7 +370,7 @@ function constructFDv2(
const fdv1FallbackSynchronizers = [
() =>
new PollingProcessorFDv2(
new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger),
new Requestor(config, platform.requests, baseHeaders, '/sdk/latest-all', config.logger),
pollingInterval,
config.logger,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc
statusCallback(subsystemCommon.DataSourceState.Initializing);

this._logger?.debug('Performing initialization request to LaunchDarkly for feature flag data.');
this._requestor.requestAllData((err, body) => {
this._requestor.requestAllData((err, body, headers) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: Realized env id wasn't being piped through in the one shot initializer, so fixed that.

if (this._stopped) {
return;
}
Expand All @@ -57,6 +57,8 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc
return;
}

const initMetadata = internal.initMetadataFromHeaders(headers);

try {
const parsed = JSON.parse(body) as internal.FDv2EventsCollection;
const payloadProcessor = new internal.PayloadProcessor(
Expand All @@ -82,7 +84,7 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc
statusCallback(subsystemCommon.DataSourceState.Valid);

payloadProcessor.addPayloadListener((payload) => {
dataCallback(payload.basis, payload);
dataCallback(payload.basis, { initMetadata, payload });
});

payloadProcessor.processEvents(parsed.events);
Expand Down