Skip to content

Commit 84dd3dd

Browse files
chore: Add FDv2 Data System (#196)
<!-- CURSOR_SUMMARY --> > [!NOTE] > Adds the FDv2 data system with a new transactional update API, composite source orchestration, write-through store, and integrated streaming/polling with selector support, plus extensive tests. > > - **FDv2 Data System (core)**: > - Introduces `IDataSourceUpdatesV2` (transactional updates API) and adapts legacy sources via `DataSourceUpdatesV2ToV1Adapter`. > - Adds `FDv2DataSystem`, `WriteThroughStore` (memory+persistent, selector-aware), and `SelectorSourceFacade`. > - Wires `LdClient` to use FDv2 when `Configuration.DataSystem` is set. > - **Composite source orchestration**: > - Refactors to `CompositeSource` using `IDataSourceObserver`, `ObservableDataSourceUpdates`, and `DataSourceUpdatesSanitizer` (V2). > - Replaces fan-out/`IActionApplier` with observer pattern and disableable sinks; updates `SourceFactory`/`ActionApplierFactory` signatures. > - **FDv2 data sources**: > - Builds FDv2 streaming/polling with selector support and transactional applies. > - `FDv2DataSource` composes initializers/synchronizers with fallback/blacklist logic and timed recovery. > - Streaming: ignore unknown events (debug log), exposes handled event list. > - **Tests & support**: > - Extensive new/updated unit tests for composite, FDv2 streaming/polling, and data flow; contract-test suppressions added. > - Adds test visibility in `AssemblyInfo`. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit dd9ec86. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Todd Anderson <[email protected]> Co-authored-by: Todd Anderson <[email protected]>
1 parent 75f51c6 commit 84dd3dd

33 files changed

+1142
-437
lines changed
Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
streaming/
2-
evaluation/
3-
hooks/
4-
migrations/
5-
secure mode hash/
6-
context type/
7-
tags/
8-
big segments/
9-
events/
10-
service endpoints/
1+
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 401
2+
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 403
3+
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 405
4+
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 401
5+
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 403
6+
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 405
7+
streaming/fdv2/reconnection state management/initializes from 2 polling initializers
8+
streaming/fdv2/fallback to FDv1 handling
9+
streaming/fdv2/disconnects on goodbye
10+
streaming/fdv2/reconnection state management/initializes from polling initializer

pkgs/sdk/server/src/Integrations/FDv2PollingDataSourceBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public IDataSource Build(LdClientContext context)
8787
context.DataSourceUpdates,
8888
requestor,
8989
_pollInterval,
90-
() => Selector.Empty // TODO: Implement
90+
() => context.SelectorSource?.Selector ?? Selector.Empty
9191
);
9292
}
9393

pkgs/sdk/server/src/Integrations/FDv2StreamingDataSourceBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public IDataSource Build(LdClientContext context)
8484
context.DataSourceUpdates,
8585
configuredBaseUri,
8686
_initialReconnectDelay,
87-
() => Selector.Empty // TODO: Implement.
87+
() => context.SelectorSource?.Selector ?? Selector.Empty
8888
);
8989
}
9090

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ActionApplierFactory.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
44
{
55
/// <summary>
6-
/// A function type that creates a new <see cref="IActionApplier"/> instance.
6+
/// A function type that creates a new <see cref="IDataSourceObserver"/> instance that is capable of applying actions to a composite data source.
77
/// </summary>
88
/// <param name="actionable">the <see cref="ICompositeSourceActionable"/> is the entity that actions should be applied to</param>
9-
/// <returns>a new <see cref="IActionApplier"/> instance</returns>
10-
internal delegate IActionApplier ActionApplierFactory(ICompositeSourceActionable actionable);
9+
/// <returns>a new <see cref="IDataSourceObserver"/> instance</returns>
10+
internal delegate IDataSourceObserver ActionApplierFactory(ICompositeSourceActionable actionable);
1111
}
1212

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
using LaunchDarkly.Sdk.Server.Interfaces;
55
using LaunchDarkly.Sdk.Server.Subsystems;
66

7+
using static LaunchDarkly.Sdk.Server.Subsystems.DataStoreTypes;
8+
79
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
810
{
911
/// <summary>
@@ -16,15 +18,15 @@ internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable
1618
{
1719
// All mutable state and the internal action queue are protected by this lock.
1820
// We also use a small, non-recursive action queue so that any re-entrant calls
19-
// from IActionApplier logic are serialized and processed iteratively instead
21+
// from action applier logic is serialized and processed iteratively instead
2022
// of recursively, avoiding the risk of stack overflows.
2123
private readonly object _lock = new object();
2224
private readonly Queue<Action> _pendingActions = new Queue<Action>();
2325
private bool _isProcessingActions;
2426
private bool _disposed;
2527

26-
private readonly IDataSourceUpdates _originalUpdateSink;
27-
private readonly IDataSourceUpdates _sanitizedUpdateSink;
28+
private readonly IDataSourceUpdatesV2 _originalUpdateSink;
29+
private readonly IDataSourceUpdatesV2 _sanitizedUpdateSink;
2830
private readonly SourcesList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> _sourcesList;
2931
private readonly DisableableDataSourceUpdatesTracker _disableableTracker;
3032

@@ -41,7 +43,7 @@ internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable
4143
/// <param name="factoryTuples">the ordered list of source factories and their associated action applier factories</param>
4244
/// <param name="circular">whether to loop off the end of the list back to the start when fallback occurs</param>
4345
public CompositeSource(
44-
IDataSourceUpdates updatesSink,
46+
IDataSourceUpdatesV2 updatesSink,
4547
IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> factoryTuples,
4648
bool circular = true)
4749
{
@@ -206,19 +208,25 @@ private void TryFindNextUnderLock()
206208
InternalDispose(errorInfo);
207209
return;
208210
}
209-
210-
// Build the list of update sinks conditionally based on whether we have an action applier factory
211-
var updateSinks = new List<IDataSourceUpdates>();
211+
212+
IReadOnlyList<IDataSourceObserver> observers;
213+
// Conditionally add the action applier if we have one
212214
if (entry.ActionApplierFactory != null)
213215
{
216+
var observersList = new List<IDataSourceObserver>();
214217
var actionApplier = entry.ActionApplierFactory(this);
215-
updateSinks.Add(actionApplier);
218+
observersList.Add(actionApplier);
219+
observers = observersList;
220+
}
221+
else
222+
{
223+
observers = Array.Empty<IDataSourceObserver>();
216224
}
217-
updateSinks.Add(_sanitizedUpdateSink);
218225

219-
// here we make a fanout so that we can trigger actions as well as forward calls to the sanitized sink (order matters here)
220-
var fanout = new FanOutDataSourceUpdates(updateSinks);
221-
var disableableUpdates = _disableableTracker.WrapAndTrack(fanout);
226+
// here we wrap the sink in observability so that we can trigger actions, the sanitized sink is
227+
// invoked before the observers to ensure actions don't trigger before data can propagate
228+
var observableUpdates = new ObservableDataSourceUpdates(_sanitizedUpdateSink, observers);
229+
var disableableUpdates = _disableableTracker.WrapAndTrack(observableUpdates);
222230

223231
_currentEntry = entry;
224232
_currentDataSource = entry.Factory(disableableUpdates);
@@ -395,5 +403,3 @@ public void BlacklistCurrent()
395403
#endregion
396404
}
397405
}
398-
399-

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/DataSourceUpdatesSanitizer.cs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
99
{
1010
/// <summary>
11-
/// An <see cref="IDataSourceUpdates"/> implementation that decorates an underlying
12-
/// <see cref="IDataSourceUpdates"/> and sanitizes status updates.
11+
/// An <see cref="IDataSourceUpdatesV2"/> implementation that decorates an underlying
12+
/// <see cref="IDataSourceUpdatesV2"/> and sanitizes status updates.
1313
/// </summary>
1414
/// <remarks>
1515
/// This wrapper ensures the following:
@@ -19,9 +19,9 @@ namespace LaunchDarkly.Sdk.Server.Internal.DataSources
1919
/// <item><description>Does not report the same combination of state and error twice in a row.</description></item>
2020
/// </list>
2121
/// </remarks>
22-
internal sealed class DataSourceUpdatesSanitizer : IDataSourceUpdates
22+
internal sealed class DataSourceUpdatesSanitizer : IDataSourceUpdatesV2
2323
{
24-
private readonly IDataSourceUpdates _inner;
24+
private readonly IDataSourceUpdatesV2 _inner;
2525
private readonly object _lock = new object();
2626

2727
private bool _alreadyReportedInitializing;
@@ -32,7 +32,7 @@ internal sealed class DataSourceUpdatesSanitizer : IDataSourceUpdates
3232
/// Creates a new <see cref="DataSourceUpdatesSanitizer"/>.
3333
/// </summary>
3434
/// <param name="inner">the underlying updates sink to delegate to</param>
35-
public DataSourceUpdatesSanitizer(IDataSourceUpdates inner)
35+
public DataSourceUpdatesSanitizer(IDataSourceUpdatesV2 inner)
3636
{
3737
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
3838
}
@@ -41,17 +41,7 @@ public DataSourceUpdatesSanitizer(IDataSourceUpdates inner)
4141
/// The underlying <see cref="IDataStoreStatusProvider"/> from the decorated sink.
4242
/// </summary>
4343
public IDataStoreStatusProvider DataStoreStatusProvider => _inner.DataStoreStatusProvider;
44-
45-
public bool Init(FullDataSet<ItemDescriptor> allData)
46-
{
47-
return _inner.Init(allData);
48-
}
49-
50-
public bool Upsert(DataKind kind, string key, ItemDescriptor item)
51-
{
52-
return _inner.Upsert(kind, key, item);
53-
}
54-
44+
5545
public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError)
5646
{
5747
lock (_lock)
@@ -87,6 +77,11 @@ public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? n
8777
_inner.UpdateStatus(sanitized, newError);
8878
}
8979
}
80+
81+
public bool Apply(ChangeSet<ItemDescriptor> changeSet)
82+
{
83+
return _inner.Apply(changeSet);
84+
}
9085
}
9186
}
9287

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/DisableableDataSourceUpdatesTracker.cs

Lines changed: 9 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ public DisableableDataSourceUpdatesTracker()
2121
}
2222

2323
/// <summary>
24-
/// Wraps the provided <see cref="IDataSourceUpdates"/> in a new instance that can be disabled by
24+
/// Wraps the provided <see cref="IDataSourceUpdatesV2"/> in a new instance that can be disabled by
2525
/// calling <see cref="DisablePreviouslyTracked"/>.
2626
/// </summary>
2727
/// <returns>a new proxy instance</returns>
28-
public IDataSourceUpdates WrapAndTrack(IDataSourceUpdates dsUpdates)
28+
public IDataSourceUpdatesV2 WrapAndTrack(IDataSourceUpdatesV2 dsUpdates)
2929
{
3030
var dis = new DisableableIDataSourceUpdates(dsUpdates);
3131
lock (_lock)
@@ -60,15 +60,15 @@ public void DisablePreviouslyTracked()
6060
}
6161

6262
/// <summary>
63-
/// A proxy for <see cref="IDataSourceUpdates"/> that can be disabled. When disabled,
63+
/// A proxy for <see cref="IDataSourceUpdatesV2"/> that can be disabled. When disabled,
6464
/// all calls to the proxy will be ignored.
6565
/// </summary>
66-
private sealed class DisableableIDataSourceUpdates : IDataSourceUpdates, IDataSourceUpdatesHeaders
66+
private sealed class DisableableIDataSourceUpdates : IDataSourceUpdatesV2
6767
{
68-
private readonly IDataSourceUpdates _updatesSink;
68+
private readonly IDataSourceUpdatesV2 _updatesSink;
6969
private volatile bool _disabled;
7070

71-
public DisableableIDataSourceUpdates(IDataSourceUpdates updatesSink)
71+
public DisableableIDataSourceUpdates(IDataSourceUpdatesV2 updatesSink)
7272
{
7373
_updatesSink = updatesSink ?? throw new ArgumentNullException(nameof(updatesSink));
7474
}
@@ -84,27 +84,7 @@ public void Disable()
8484
private bool IsDisabled => _disabled;
8585

8686
public IDataStoreStatusProvider DataStoreStatusProvider => _updatesSink.DataStoreStatusProvider;
87-
88-
public bool Init(FullDataSet<ItemDescriptor> allData)
89-
{
90-
if (IsDisabled)
91-
{
92-
return false;
93-
}
94-
95-
return _updatesSink.Init(allData);
96-
}
97-
98-
public bool Upsert(DataKind kind, string key, ItemDescriptor item)
99-
{
100-
if (IsDisabled)
101-
{
102-
return false;
103-
}
104-
105-
return _updatesSink.Upsert(kind, key, item);
106-
}
107-
87+
10888
public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError)
10989
{
11090
if (IsDisabled)
@@ -115,22 +95,9 @@ public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? n
11595
_updatesSink.UpdateStatus(newState, newError);
11696
}
11797

118-
public bool InitWithHeaders(
119-
FullDataSet<ItemDescriptor> allData,
120-
IEnumerable<KeyValuePair<string, IEnumerable<string>>> headers
121-
)
98+
public bool Apply(ChangeSet<ItemDescriptor> changeSet)
12299
{
123-
if (IsDisabled)
124-
{
125-
return false;
126-
}
127-
128-
if (_updatesSink is IDataSourceUpdatesHeaders headersSink)
129-
{
130-
return headersSink.InitWithHeaders(allData, headers);
131-
}
132-
133-
return _updatesSink.Init(allData);
100+
return !IsDisabled && _updatesSink.Apply(changeSet);
134101
}
135102
}
136103
}

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/FanOutDataSourceUpdates.cs

Lines changed: 0 additions & 92 deletions
This file was deleted.

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/IActionApplier.cs

Lines changed: 0 additions & 12 deletions
This file was deleted.

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
44
{
55
/// <summary>
6-
/// Defines operations that can be performed by <see cref="IActionApplier"/>s
6+
/// Defines operations that can be performed by <see cref="IDataSourceObserver"/>s
77
/// on a composite data source.
88
/// </summary>
99
internal interface ICompositeSourceActionable

0 commit comments

Comments
 (0)