diff --git a/.github/workflows/server-node.yml b/.github/workflows/server-node.yml index 7966c7aafa..e8100f4a56 100644 --- a/.github/workflows/server-node.yml +++ b/.github/workflows/server-node.yml @@ -37,8 +37,19 @@ jobs: run: yarn contract-test-service-build - name: Launch the test service in the background run: yarn contract-test-service 2>&1 & + - name: Clone and run contract tests from feat/fdv2 branch + run: | + mkdir -p /tmp/sdk-test-harness + git clone https://github.com/launchdarkly/sdk-test-harness.git /tmp/sdk-test-harness + cp ./contract-tests/testharness-suppressions-fdv2.txt /tmp/sdk-test-harness/testharness-suppressions-fdv2.txt + cd /tmp/sdk-test-harness + git checkout feat/fdv2 + go build -o test-harness . + ./test-harness -url http://localhost:8000 -debug --skip-from=testharness-suppressions-fdv2.txt + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - uses: launchdarkly/gh-actions/actions/contract-tests@contract-tests-v1.0.2 with: test_service_port: 8000 token: ${{ secrets.GITHUB_TOKEN }} - extra_params: '--skip-from=./contract-tests/testharness-suppressions.txt' + extra_params: '--skip-from=./contract-tests/testharness-suppressions.txt -stop-service-at-end' diff --git a/contract-tests/package.json b/contract-tests/package.json index 7b3f880d64..7b6b9b4e45 100644 --- a/contract-tests/package.json +++ b/contract-tests/package.json @@ -12,7 +12,7 @@ "license": "Apache-2.0", "private": true, "dependencies": { - "@launchdarkly/node-server-sdk": "9.8.0", + "@launchdarkly/node-server-sdk": "*", "body-parser": "^1.19.0", "express": "^4.17.1", "got": "14.4.7" diff --git a/contract-tests/src/sdkClientEntity.ts b/contract-tests/src/sdkClientEntity.ts index ad0606b1d1..01c4d26891 100644 --- a/contract-tests/src/sdkClientEntity.ts +++ b/contract-tests/src/sdkClientEntity.ts @@ -2,6 +2,7 @@ import got from 'got'; import ld, { createMigration, + DataSourceOptions, LDClient, LDConcurrentExecution, LDContext, @@ -33,6 +34,11 @@ interface SdkConfigOptions { pollIntervalMs: number; filter?: string; }; + dataSystem?: { + initializers?: SDKDataSystemInitializerParams[]; + synchronizers?: SDKDataSystemSynchronizerParams; + payloadFilter?: string; + }; events?: { allAttributesPrivate?: boolean; baseUri: string; @@ -67,6 +73,31 @@ interface SdkConfigOptions { }; } +export interface SDKDataSystemSynchronizerParams { + primary?: { + streaming?: SDKDataSourceStreamingParams; + polling?: SDKDataSourcePollingParams; + }; + secondary?: { + streaming?: SDKDataSourceStreamingParams; + polling?: SDKDataSourcePollingParams; + }; +} + +export interface SDKDataSystemInitializerParams { + polling?: SDKDataSourcePollingParams; +} + +export interface SDKDataSourceStreamingParams { + baseUri?: string; + initialRetryDelayMs?: number; +} + +export interface SDKDataSourcePollingParams { + baseUri?: string; + pollIntervalMs?: number; +} + interface CommandParams { command: string; evaluate?: { @@ -128,8 +159,7 @@ export function makeSdkConfig(options: SdkConfigOptions, tag: string): LDOptions cf.streamUri = options.streaming.baseUri; cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs); if (options.streaming.filter) { - cf.application = cf.application || {}; - cf.application.payloadFilterKey = options.streaming.filter; + cf.payloadFilterKey = options.streaming.filter; } } @@ -138,8 +168,7 @@ export function makeSdkConfig(options: SdkConfigOptions, tag: string): LDOptions cf.baseUri = options.polling.baseUri; cf.pollInterval = options.polling.pollIntervalMs / 1000; if (options.polling.filter) { - cf.application = cf.application || {}; - cf.application.payloadFilterKey = options.polling.filter; + cf.payloadFilterKey = options.polling.filter; } } @@ -192,6 +221,64 @@ export function makeSdkConfig(options: SdkConfigOptions, tag: string): LDOptions } } + if (options.dataSystem) { + const dataSourceStreamingOptions: SDKDataSourceStreamingParams | undefined = + options.dataSystem.synchronizers?.primary?.streaming ?? + options.dataSystem.synchronizers?.secondary?.streaming; + const dataSourcePollingOptions: SDKDataSourcePollingParams | undefined = + options.dataSystem.initializers?.[0]?.polling ?? + options.dataSystem.synchronizers?.primary?.polling ?? + options.dataSystem.synchronizers?.secondary?.polling; + + if (dataSourceStreamingOptions) { + cf.streamUri = dataSourceStreamingOptions.baseUri; + cf.streamInitialReconnectDelay = maybeTime(dataSourceStreamingOptions.initialRetryDelayMs); + } + if (dataSourcePollingOptions) { + cf.stream = false; + cf.baseUri = dataSourcePollingOptions.baseUri; + cf.pollInterval = maybeTime(dataSourcePollingOptions.pollIntervalMs); + } + + let dataSourceOptions: DataSourceOptions | undefined; + if (dataSourceStreamingOptions && dataSourcePollingOptions) { + dataSourceOptions = { + dataSourceOptionsType: 'standard', + ...(dataSourceStreamingOptions.initialRetryDelayMs != null && { + streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs), + }), + ...(dataSourcePollingOptions.pollIntervalMs != null && { + pollInterval: dataSourcePollingOptions.pollIntervalMs, + }), + }; + } else if (dataSourceStreamingOptions) { + dataSourceOptions = { + dataSourceOptionsType: 'streamingOnly', + ...(dataSourceStreamingOptions.initialRetryDelayMs != null && { + streamInitialReconnectDelay: maybeTime(dataSourceStreamingOptions.initialRetryDelayMs), + }), + }; + } else if (dataSourcePollingOptions) { + dataSourceOptions = { + dataSourceOptionsType: 'pollingOnly', + ...(dataSourcePollingOptions.pollIntervalMs != null && { + pollInterval: dataSourcePollingOptions.pollIntervalMs, + }), + }; + } else { + // No data source options were specified + dataSourceOptions = undefined; + } + + if (options.dataSystem.payloadFilter) { + cf.payloadFilterKey = options.dataSystem.payloadFilter; + } + + cf.dataSystem = { + dataSource: dataSourceOptions, + }; + } + return cf; } diff --git a/contract-tests/testharness-suppressions-fdv2.txt b/contract-tests/testharness-suppressions-fdv2.txt new file mode 100644 index 0000000000..173fb7d30d --- /dev/null +++ b/contract-tests/testharness-suppressions-fdv2.txt @@ -0,0 +1,14 @@ +streaming/validation/drop and reconnect if stream event has malformed JSON +streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema +streaming/requests/URL path is computed correctly/environment_filter_key="encoding_not_necessary"/base URI has no trailing slash/GET +streaming/requests/URL path is computed correctly/environment_filter_key="encoding_not_necessary"/base URI has a trailing slash/GET +polling/requests/URL path is computed correctly/environment_filter_key="encoding_not_necessary"/base URI has no trailing slash/GET +polling/requests/URL path is computed correctly/environment_filter_key="encoding_not_necessary"/base URI has a trailing slash/GET + +streaming/fdv2/reconnection state management/initializes from polling initializer +streaming/fdv2/reconnection state management/initializes from 2 polling initializers +streaming/fdv2/reconnection state management/saves previously known state +streaming/fdv2/reconnection state management/replaces previously known state +streaming/fdv2/reconnection state management/updates previously known state +streaming/fdv2/ignores model version +streaming/fdv2/can discard partial events on errors \ No newline at end of file diff --git a/packages/shared/common/src/datasource/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts index 63aae519f6..8b39225786 100644 --- a/packages/shared/common/src/datasource/CompositeDataSource.ts +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -207,8 +207,8 @@ export class CompositeDataSource implements DataSource { currentDS?.stop(); if (transitionRequest.err && transitionRequest.transition !== 'stop') { - // if the transition was due to an error, throttle the transition - const delay = this._backoff.fail(); + // if the transition was due to an error we're not in the initializer phase, throttle the transition. Fallback between initializers is not throttled. + const delay = this._initPhaseActive ? 0 : this._backoff.fail(); const { promise, cancel: cancelDelay } = this._cancellableDelay(delay); this._cancelTokens.push(cancelDelay); const delayedTransition = promise.then(() => { diff --git a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts index 440912b9de..a130218a02 100644 --- a/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/OneShotInitializerFDv2.test.ts @@ -60,18 +60,21 @@ describe('given a one shot initializer', () => { requestor.requestAllData = jest.fn((cb) => cb(undefined, jsonData)); initializer.start(mockDataCallback, mockStatusCallback); expect(mockDataCallback).toHaveBeenNthCalledWith(1, true, { - basis: true, - id: `mockId`, - state: `mockState`, - updates: [ - { - kind: `flag`, - key: `flagA`, - version: 123, - object: { objectFieldA: 'objectValueA' }, - }, - ], - version: 1, + initMetadata: undefined, + payload: { + basis: true, + id: `mockId`, + state: `mockState`, + updates: [ + { + kind: `flag`, + key: `flagA`, + version: 123, + object: { objectFieldA: 'objectValueA' }, + }, + ], + version: 1, + }, }); }); }); diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index c7655564f8..a4f324ac99 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -324,14 +324,16 @@ function constructFDv2( // make the FDv2 composite datasource with initializers/synchronizers const initializers: subsystem.LDDataSourceFactory[] = []; - // use one shot initializer for performance and cost - initializers.push( - () => - new OneShotInitializerFDv2( - new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), - config.logger, - ), - ); + // use one shot initializer for performance and cost if we can do a combination of polling and streaming + if (isStandardOptions(dataSystem.dataSource)) { + initializers.push( + () => + new OneShotInitializerFDv2( + new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), + config.logger, + ), + ); + } const synchronizers: subsystem.LDDataSourceFactory[] = []; // if streaming is configured, add streaming synchronizer @@ -368,7 +370,7 @@ function constructFDv2( const fdv1FallbackSynchronizers = [ () => new PollingProcessorFDv2( - new Requestor(config, platform.requests, baseHeaders, '/sdk/poll', config.logger), + new Requestor(config, platform.requests, baseHeaders, '/sdk/latest-all', config.logger), pollingInterval, config.logger, true, diff --git a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts index 9d454a7a65..f9cd097742 100644 --- a/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts +++ b/packages/shared/sdk-server/src/data_sources/OneShotInitializerFDv2.ts @@ -30,7 +30,7 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc statusCallback(subsystemCommon.DataSourceState.Initializing); this._logger?.debug('Performing initialization request to LaunchDarkly for feature flag data.'); - this._requestor.requestAllData((err, body) => { + this._requestor.requestAllData((err, body, headers) => { if (this._stopped) { return; } @@ -57,6 +57,8 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc return; } + const initMetadata = internal.initMetadataFromHeaders(headers); + try { const parsed = JSON.parse(body) as internal.FDv2EventsCollection; const payloadProcessor = new internal.PayloadProcessor( @@ -82,7 +84,7 @@ export default class OneShotInitializerFDv2 implements subsystemCommon.DataSourc statusCallback(subsystemCommon.DataSourceState.Valid); payloadProcessor.addPayloadListener((payload) => { - dataCallback(payload.basis, payload); + dataCallback(payload.basis, { initMetadata, payload }); }); payloadProcessor.processEvents(parsed.events);