Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
db1b38a
feat: Add support for transactional data stores.
kinyoklion Dec 1, 2025
1757212
chore: Add changeset sorting.
kinyoklion Dec 2, 2025
5c499c9
Remove version collapsing.
kinyoklion Dec 3, 2025
d73c5f9
Change data to property
kinyoklion Dec 3, 2025
9ffcf50
chore: Add support for transactional data source updates.
kinyoklion Dec 2, 2025
edde5c3
Merge branch 'rlamb/add-changeset-sorting' into rlamb/sdk-1583/transa…
kinyoklion Dec 3, 2025
8805426
Basic transactional apply routing.
kinyoklion Dec 3, 2025
9c37607
Support non-transactional data stores.
kinyoklion Dec 3, 2025
5d16f31
Organize code by region.
kinyoklion Dec 3, 2025
94184a3
Only find environment ID once per connection.
kinyoklion Dec 3, 2025
94cdabf
chore: Add FDv2 polling data source.
kinyoklion Dec 4, 2025
8c14faa
Error handling improvements.
kinyoklion Dec 4, 2025
5b2e4d3
Merge branch 'rlamb/sdk-1583/transactional-data-source-updates' into …
kinyoklion Dec 4, 2025
90e87a0
Error handling improvements.
kinyoklion Dec 4, 2025
9655e3a
Merge branch 'rlamb/sdk-1583/transactional-data-source-updates' into …
kinyoklion Dec 4, 2025
5844a66
Refactor application to transactional versus non-transactional stores.
kinyoklion Dec 4, 2025
891ad0a
Sort and add comments.
kinyoklion Dec 4, 2025
ff7c34d
Merge branch 'rlamb/sdk-1583/transactional-data-source-updates' into …
kinyoklion Dec 4, 2025
aee3377
Merge branch 'main' into rlamb/sdk-1584/fdv2-polling-data-source
kinyoklion Dec 5, 2025
17a2bea
chore: Add support for FDv2 configuration.
kinyoklion Dec 8, 2025
30bec0e
More robust JSON parsing.
kinyoklion Dec 8, 2025
1f76c9c
Merge branch 'rlamb/sdk-1584/fdv2-polling-data-source' into rlamb/sdk…
kinyoklion Dec 8, 2025
6c5765a
Better 304 handling.
kinyoklion Dec 8, 2025
9464684
Merge branch 'rlamb/sdk-1584/fdv2-polling-data-source' into rlamb/sdk…
kinyoklion Dec 8, 2025
69b18d1
More JSON error handling.
kinyoklion Dec 8, 2025
70061b7
Merge branch 'rlamb/sdk-1584/fdv2-polling-data-source' into rlamb/sdk…
kinyoklion Dec 8, 2025
296bf6f
chore: Data system configuration.
kinyoklion Dec 8, 2025
7f97022
Add support for per-data source endpoints.
kinyoklion Dec 9, 2025
d00a437
Merge branch 'main' into rlamb/sdk-1589/data-system-configurtion
kinyoklion Dec 10, 2025
60c2df4
Address PR feedback.
kinyoklion Dec 10, 2025
3208b92
Merge branch 'main' into rlamb/sdk-1589/data-system-configurtion
kinyoklion Dec 10, 2025
ff420a6
Builder consistency.
kinyoklion Dec 10, 2025
b0d07a9
chore: Add FDv1 DataSystem.
kinyoklion Dec 10, 2025
0adf19f
Merge branch 'main' into rlamb/sdk-1586/fdv1-data-system
kinyoklion Dec 10, 2025
c5f65c9
Remove scaffolded FDv2DataSystem.
kinyoklion Dec 10, 2025
334635e
Undo formatting changes.
kinyoklion Dec 10, 2025
52f8d84
Initialization of source, not store.
kinyoklion Dec 10, 2025
8783956
Implement disposing pattern for FDv1 data system.
kinyoklion Dec 10, 2025
1811386
Fix initialized call.
kinyoklion Dec 10, 2025
afc00f8
chore: FDv2 Data System.
kinyoklion Dec 10, 2025
48d351e
Able to connect and get a payload.
kinyoklion Dec 11, 2025
ceb4701
Selector passing.
kinyoklion Dec 11, 2025
26f4b58
Test updates and remove outdated comment.
kinyoklion Dec 12, 2025
3b8e1da
Remove test app.
kinyoklion Dec 12, 2025
e50b1dc
Add IDataSourceUpdatesV2
kinyoklion Dec 12, 2025
8d25d29
Merge branch 'main' into rlamb/sdk1586/fdv2-data-system
kinyoklion Dec 12, 2025
ad29712
Test updates.
kinyoklion Dec 12, 2025
5d2306d
Test updates and temporary data source updates change
kinyoklion Dec 12, 2025
429f50c
chore: tweaking FDv2 composite datasource logic from testing
tanderson-ld Dec 12, 2025
9aa4a7a
Merge composite changes. Refactor data source interactions.
kinyoklion Dec 12, 2025
30bf8bc
Updated suppression list.
kinyoklion Dec 12, 2025
d895e5c
Minor refactorings.
kinyoklion Dec 12, 2025
4549dc0
Fix dispose
kinyoklion Dec 12, 2025
403f2e4
Bug fixes in WriteThroughStore.
kinyoklion Dec 12, 2025
959eddf
Update pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs
kinyoklion Dec 15, 2025
19c2c65
Address writethrough store feedback.
kinyoklion Dec 15, 2025
e6f12ab
Only move out of selectors when there is a selector.
kinyoklion Dec 15, 2025
60d9dbf
Add another supression.
kinyoklion Dec 15, 2025
9d9bb83
More suppressions.
kinyoklion Dec 15, 2025
7af16ce
Only transition on non-empty selector.
kinyoklion Dec 15, 2025
48f4568
Handle selector logic in FDv2 Data System transitions.
kinyoklion Dec 15, 2025
a293eb6
Update test to account for selector.
kinyoklion Dec 15, 2025
ce40826
Ignore unknown event types.
kinyoklion Dec 15, 2025
dd9ec86
Update test for logs for unknown events.
kinyoklion Dec 15, 2025
386a8db
Merge remote-tracking branches 'origin' and 'origin/rlamb/sdk1586/fdv…
tanderson-ld Dec 16, 2025
33e4fe8
chore: adds FDv1 Fallback support to FDv2 Data Source
tanderson-ld Dec 16, 2025
bfef7aa
Merge branch 'main' into ta/SDK-1591/fdv1-fallback
kinyoklion Dec 16, 2025
e2f3453
Upgrade internal and event source dependencies.
kinyoklion Dec 16, 2025
15ba2e7
fdv1 fallback checking for true on header, fixing fallback bug when f…
tanderson-ld Dec 17, 2025
25910ad
enabling the fdv1 fallback contract test, of course we want that
tanderson-ld Dec 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions pkgs/sdk/server/contract-tests/SdkClientEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,45 @@ private static Configuration BuildSdkConfig(SdkConfigParams sdkParams, ILogAdapt
if (synchronizers.Count > 0)
{
dataSystemBuilder.Synchronizers(synchronizers.ToArray());

// Find the best synchronizer to use for FDv1 fallback configuration
// Prefer polling synchronizers since FDv1 fallback is polling-based
SdkConfigSynchronizerParams synchronizerForFallback = null;

// First, try to find a polling synchronizer (check secondary first, then primary)
if (sdkParams.DataSystem.Synchronizers.Secondary != null &&
sdkParams.DataSystem.Synchronizers.Secondary.Polling != null)
{
synchronizerForFallback = sdkParams.DataSystem.Synchronizers.Secondary;
}
else if (sdkParams.DataSystem.Synchronizers.Primary != null &&
sdkParams.DataSystem.Synchronizers.Primary.Polling != null)
{
synchronizerForFallback = sdkParams.DataSystem.Synchronizers.Primary;
}
// If no polling synchronizer found, use primary synchronizer (could be streaming)
else if (sdkParams.DataSystem.Synchronizers.Primary != null)
{
synchronizerForFallback = sdkParams.DataSystem.Synchronizers.Primary;
}

if (synchronizerForFallback != null)
{
// Only configure global polling endpoints if we have a polling synchronizer with a custom base URI
// This ensures the FDv1 fallback synchronizer uses the same base URI without overwriting
// existing polling endpoint configuration
if (synchronizerForFallback.Polling != null &&
synchronizerForFallback.Polling.BaseUri != null)
{
endpoints.Polling(synchronizerForFallback.Polling.BaseUri);
}

var fdv1Fallback = CreateFDv1FallbackSynchronizer(synchronizerForFallback);
if (fdv1Fallback != null)
{
dataSystemBuilder.FDv1FallbackSynchronizer(fdv1Fallback);
}
}
}
}

Expand Down Expand Up @@ -568,6 +607,33 @@ private static IComponentConfigurer<IDataSource> CreateSynchronizer(
return null;
}

private static IComponentConfigurer<IDataSource> CreateFDv1FallbackSynchronizer(
SdkConfigSynchronizerParams synchronizer)
{
// FDv1 fallback synchronizer is always polling-based
var fdv1PollingBuilder = DataSystemComponents.FDv1Polling();

// Configure polling interval if the synchronizer has polling configuration
if (synchronizer.Polling != null)
{
if (synchronizer.Polling.PollIntervalMs.HasValue)
{
fdv1PollingBuilder.PollInterval(TimeSpan.FromMilliseconds(synchronizer.Polling.PollIntervalMs.Value));
}
// Note: FDv1 polling doesn't support ServiceEndpointsOverride, so base URI
// will use the global service endpoints configuration
}
else if (synchronizer.Streaming != null)
{
// For streaming synchronizers, we still create a polling fallback
// Use default polling interval since streaming doesn't have a poll interval
// Note: FDv1 polling doesn't support ServiceEndpointsOverride, so base URI
// will use the global service endpoints configuration
}

return fdv1PollingBuilder;
}

private MigrationVariationResponse DoMigrationVariation(MigrationVariationParams migrationVariation)
{
var defaultStage = MigrationStageExtensions.FromDataModelString(migrationVariation.DefaultStage);
Expand Down
1 change: 0 additions & 1 deletion pkgs/sdk/server/contract-tests/test-supressions-fdv2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnec
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 403
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 405
streaming/fdv2/reconnection state management/initializes from 2 polling initializers
streaming/fdv2/fallback to FDv1 handling
streaming/fdv2/disconnects on goodbye
streaming/fdv2/reconnection state management/initializes from polling initializer
6 changes: 6 additions & 0 deletions pkgs/sdk/server/src/Interfaces/DataSourceStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public struct ErrorInfo
/// </summary>
public DateTime Time { get; set; }

/// <summary>
/// The error indicates to fall back to FDv1. (At the time of writing this, this was indicated
/// via the x-ld-fd-fallback header, but this may change in the future. This is just info for posterity.)
/// </summary>
public bool FDv1Fallback { get; set; }

/// <summary>
/// Constructs an instance based on an exception.
/// </summary>
Expand Down
56 changes: 51 additions & 5 deletions pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading.Tasks;
using LaunchDarkly.Sdk.Server.Interfaces;
using LaunchDarkly.Sdk.Server.Subsystems;

using static LaunchDarkly.Sdk.Server.Subsystems.DataStoreTypes;

namespace LaunchDarkly.Sdk.Server.Internal.DataSources
Expand All @@ -16,7 +17,7 @@ internal static class FDv2DataSource
/// <param name="updatesSink">the sink that receives updates from the active source</param>
/// <param name="initializers">List of data source factories used for initialization</param>
/// <param name="synchronizers">List of data source factories used for synchronization</param>
/// <param name="fdv1Synchronizers">List of data source factories used for FDv1 synchronization</param>
/// <param name="fdv1Synchronizers">List of data source factories used for FDv1 synchronization if fallback to FDv1 occurs</param>
/// <returns>a new data source instance</returns>
public static IDataSource CreateFDv2DataSource(
IDataSourceUpdatesV2 updatesSink,
Expand All @@ -31,7 +32,7 @@ public static IDataSource CreateFDv2DataSource(
ActionApplierFactory fastFallbackApplierFactory = (actionable) => new ActionApplierFastFallback(actionable);
ActionApplierFactory timedFallbackAndRecoveryApplierFactory =
(actionable) => new ActionApplierTimedFallbackAndRecovery(actionable);

ActionApplierFactory fdv1FallbackApplierFactory = (actionable) => new FDv1FallbackActionApplier(actionable);
var underlyingComposites = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)>();

// Only create the initializers composite if initializers are provided
Expand Down Expand Up @@ -71,13 +72,32 @@ public static IDataSource CreateFDv2DataSource(

return new CompositeSource(sink as IDataSourceUpdatesV2, synchronizersFactoryTuples);
},
null // TODO: add fallback to FDv1 logic, null for the moment as once we're on the synchronizers, we stay there
// Only attach FDv1 fallback applier if FDv1 synchronizers are actually provided
(fdv1Synchronizers != null && fdv1Synchronizers.Count > 0) ? fdv1FallbackApplierFactory : null
));
}

var combinedCompositeSource = new CompositeSource(updatesSink, underlyingComposites, circular: false);
// Add the FDv1 fallback synchronizers composite if provided
if (fdv1Synchronizers != null && fdv1Synchronizers.Count > 0)
{
underlyingComposites.Add((
// Create fdv1SynchronizersCompositeSource with action logic unique to fdv1Synchronizers
(sink) =>
{
var fdv1SynchronizersFactoryTuples =
new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)>();
for (int i = 0; i < fdv1Synchronizers.Count; i++)
{
fdv1SynchronizersFactoryTuples.Add((fdv1Synchronizers[i], timedFallbackAndRecoveryApplierFactory)); // fdv1 synchronizers behave same as synchronizers
}

return new CompositeSource(sink as IDataSourceUpdatesV2, fdv1SynchronizersFactoryTuples);
},
null // no action applier for fdv1Synchronizers as a whole
));
}

// TODO: add fallback to FDv1 logic
var combinedCompositeSource = new CompositeSource(updatesSink, underlyingComposites, circular: false);

return combinedCompositeSource;
}
Expand Down Expand Up @@ -265,5 +285,31 @@ public void Apply(ChangeSet<ItemDescriptor> changeSet)
_actionable.StartCurrent();
}
}

private class FDv1FallbackActionApplier : IDataSourceObserver
{
private readonly ICompositeSourceActionable _actionable;

public FDv1FallbackActionApplier(ICompositeSourceActionable actionable)
{
_actionable = actionable ?? throw new ArgumentNullException(nameof(actionable));
}

public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError)
{
if (newError != null && newError.Value.FDv1Fallback)
{
_actionable.BlacklistCurrent(); // blacklist the synchronizers altogether
_actionable.DisposeCurrent(); // dispose the synchronizers
_actionable.GoToNext(); // go to the FDv1 fallback synchronizer
_actionable.StartCurrent(); // start the FDv1 fallback synchronizer
}
}

public void Apply(ChangeSet<ItemDescriptor> changeSet)
{
// this FDv1 fallback action applier doesn't care about apply, it only looks for the FDv1Fallback flag in the errors
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ private async Task UpdateTaskAsync()
catch (UnsuccessfulResponseException ex)
{
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode);

// Check for LD fallback header
if (ex.Headers != null)
{
errorInfo.FDv1Fallback = ex.Headers
.Where(h => string.Equals(h.Key, "x-ld-fd-fallback", StringComparison.OrdinalIgnoreCase))
.SelectMany(h => h.Value)
.Any(v => string.Equals(v, "true", StringComparison.OrdinalIgnoreCase));
}

if (HttpErrors.IsRecoverable(ex.StatusCode))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ internal FDv2PollingRequestor(LdClientContext context, Uri baseUri)

if (!response.IsSuccessStatusCode)
{
throw new UnsuccessfulResponseException((int)response.StatusCode);
throw new UnsuccessfulResponseException((int)response.StatusCode, response.Headers);
}

lock (_etags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@ private void OnError(object sender, ExceptionEventArgs e)
{
var status = respEx.StatusCode;
errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(status);

// Check for LD fallback header
if (respEx.Headers != null)
{
errorInfo.FDv1Fallback = respEx.Headers
.Where(h => string.Equals(h.Key, "x-ld-fd-fallback", StringComparison.OrdinalIgnoreCase))
.SelectMany(h => h.Value)
.Any(v => string.Equals(v, "true", StringComparison.OrdinalIgnoreCase));
}

RecordStreamInit(true);
if (!HttpErrors.IsRecoverable(status))
{
Expand Down
4 changes: 2 additions & 2 deletions pkgs/sdk/server/src/LaunchDarkly.ServerSdk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
<ItemGroup>
<PackageReference Include="LaunchDarkly.Cache" Version="1.0.2" />
<PackageReference Include="LaunchDarkly.CommonSdk" Version="7.1.1" />
<PackageReference Include="LaunchDarkly.EventSource" Version="5.2.1" />
<PackageReference Include="LaunchDarkly.InternalSdk" Version="3.5.5" />
<PackageReference Include="LaunchDarkly.EventSource" Version="5.3.0" />
<PackageReference Include="LaunchDarkly.InternalSdk" Version="3.6.0" />
<PackageReference Include="LaunchDarkly.Logging" Version="2.0.0" />
</ItemGroup>

Expand Down
127 changes: 127 additions & 0 deletions pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,133 @@ public async Task CanDisposeWhenSynchronizersFallingBackUnthrottled()
// The result may be true or false depending on implementation, but the key is that disposal works
}

[Fact]
public async Task ErrorWithFDv1FallbackTriggersFallbackToFDv1Synchronizers()
{
// Create a capturing sink to observe all updates
var capturingSink = new CapturingDataSourceUpdatesWithHeaders();

// Track whether the synchronizer factory was invoked
bool synchronizerFactoryInvoked = false;
IDataSourceUpdatesV2 synchronizerUpdateSink = null;

// Create synchronizer factory: emits Initializing, then reports Interrupted with FDv1Fallback error
SourceFactory synchronizerFactory = (updatesSink) =>
{
synchronizerFactoryInvoked = true;
synchronizerUpdateSink = updatesSink;
var source = new MockDataSourceWithInit(
async () =>
{
// Emit Initializing
updatesSink.UpdateStatus(DataSourceState.Initializing, null);
await Task.Delay(10);

// Report Interrupted with error that has FDv1Fallback = true
var errorInfo = new DataSourceStatus.ErrorInfo
{
Kind = DataSourceStatus.ErrorKind.ErrorResponse,
StatusCode = 503,
FDv1Fallback = true,
Time = DateTime.Now
};
updatesSink.UpdateStatus(DataSourceState.Interrupted, errorInfo);
await Task.Delay(10);
}
);
return source;
};

// Track whether the fdv1Synchronizer factory was invoked
bool fdv1SynchronizerFactoryInvoked = false;
IDataSourceUpdatesV2 fdv1SynchronizerUpdateSink = null;

// Create dummy data for fdv1Synchronizer
var fdv1SynchronizerDummyData = new FullDataSet<ItemDescriptor>(new Dictionary<DataKind, KeyedItems<ItemDescriptor>>());

// Create fdv1Synchronizer factory: emits Initializing, calls init with dummy data, then reports Valid
SourceFactory fdv1SynchronizerFactory = (updatesSink) =>
{
fdv1SynchronizerFactoryInvoked = true;
fdv1SynchronizerUpdateSink = updatesSink;
var source = new MockDataSourceWithInit(
async () =>
{
// Emit Initializing
updatesSink.UpdateStatus(DataSourceState.Initializing, null);
await Task.Delay(10);

// Report Valid
updatesSink.UpdateStatus(DataSourceState.Valid, null);
await Task.Delay(10);

// Call Apply with dummy data
updatesSink.Apply(new ChangeSet<ItemDescriptor>(
ChangeSetType.Full,
Selector.Empty,
fdv1SynchronizerDummyData.Data,
null
));
await Task.Delay(10);
}
);
return source;
};

// Create FDv2DataSource with no initializers, one synchronizer, and one fdv1Synchronizer
var initializers = new List<SourceFactory>();
var synchronizers = new List<SourceFactory> { synchronizerFactory };
var fdv1Synchronizers = new List<SourceFactory> { fdv1SynchronizerFactory };

var dataSource = FDv2DataSource.CreateFDv2DataSource(
capturingSink,
initializers,
synchronizers,
fdv1Synchronizers
);

// Start the data source
var startTask = dataSource.Start();

// Wait for status updates - we expect:
// 1. Initializing (from synchronizer)
// 2. Interrupted (from synchronizer with FDv1Fallback error)
// 3. Interrupted (initializing from fdv1Synchronizer is mapped to interrupted by sanitizer after fallback)
// 4. Valid (from fdv1Synchronizer)
var statusUpdates = capturingSink.WaitForStatusUpdates(4, TimeSpan.FromSeconds(5));

// Verify that Start() completed successfully
var startResult = await startTask;
Assert.True(startResult);

// Verify status updates
Assert.True(statusUpdates.Count >= 4, $"Expected at least 4 status updates, got {statusUpdates.Count}");

// Position 0: Initializing (from synchronizer)
Assert.Equal(DataSourceState.Initializing, statusUpdates[0].State);

// Position 1: Interrupted (from synchronizer with FDv1Fallback error)
Assert.Equal(DataSourceState.Interrupted, statusUpdates[1].State);
Assert.NotNull(statusUpdates[1].LastError);
Assert.True(statusUpdates[1].LastError.Value.FDv1Fallback, "FDv1Fallback should be true in the error");

// Position 2: Interrupted (initializing from fdv1Synchronizer is mapped to interrupted by sanitizer after fallback)
Assert.Equal(DataSourceState.Interrupted, statusUpdates[2].State);

// Position 3: Valid (from fdv1Synchronizer)
Assert.Equal(DataSourceState.Valid, statusUpdates[3].State);

// Verify that both factories were invoked
Assert.True(synchronizerFactoryInvoked, "Synchronizer factory should have been invoked");
Assert.True(fdv1SynchronizerFactoryInvoked, "FDv1Synchronizer factory should have been invoked after fallback");

// Verify that Apply was called with fdv1Synchronizer dummy data
var changeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(1));
Assert.Equal(ChangeSetType.Full, changeSet.Type);

dataSource.Dispose();
}


// Mock data source that executes an async action when started
private class MockDataSourceWithInit : IDataSource
Expand Down
Loading
Loading