Skip to content

Commit 7db08b3

Browse files
authored
chore: adds FDv2 source using composite datasource (#191)
**Requirements** - [x] I have added test coverage for new or changed functionality - [x] I have followed the repository's [pull request submission guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests) - [ ] I have validated my changes against all supported platform versions Not trivially possible yet, still need to hook up more code to what @kinyoklion has written. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Introduces FDv2 data source built from composable sources; refactors CompositeSource to delegate-based factories with async action processing and exhaustion/error reporting; adds extensive tests and test helpers. > > - **FDv2 Data Source**: > - Add `FDv2DataSource` with initializer/synchronizer composition using `CompositeSource`. > - Implement action appliers: fast fallback for initializers, timed fallback/recovery for synchronizers, and blacklist-on-success/off. > - **CompositeSource Refactor**: > - Replace `ISourceFactory`/`IActionApplierFactory` with delegate types `SourceFactory` and `ActionApplierFactory`. > - Process enqueued actions on a background thread; allow disposal while actions are queued. > - Report `Off` directly with optional `ErrorInfo`; emit exhaustion error when no sources remain. > - Build sinks via `FanOutDataSourceUpdates` and disableable tracker; maintain current entry for blacklist/remove. > - **APIs**: > - Add delegates `SourceFactory` and `ActionApplierFactory`; remove corresponding interfaces. > - **Tests**: > - Expand `CompositeSourceTest` for fallback, blacklist, disposal semantics, exhaustion, and disabled-source isolation. > - Add comprehensive `FDv2DataSourceTest` covering initializer/synchronizer flows, failures, and edge cases. > - Enhance `SourcesListTest` with non-circular end-of-list behavior. > - **Test Utilities**: > - Extend `CapturingDataSourceUpdates` to record and wait for ordered status updates. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 370e94e. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 59616e3 commit 7db08b3

File tree

10 files changed

+1452
-138
lines changed

10 files changed

+1452
-138
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using LaunchDarkly.Sdk.Server.Subsystems;
2+
3+
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
4+
{
5+
/// <summary>
6+
/// A function type that creates a new <see cref="IActionApplier"/> instance.
7+
/// </summary>
8+
/// <param name="actionable">the <see cref="ICompositeSourceActionable"/> is the entity that actions should be applied to</param>
9+
/// <returns>a new <see cref="IActionApplier"/> instance</returns>
10+
internal delegate IActionApplier ActionApplierFactory(ICompositeSourceActionable actionable);
11+
}
12+

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ namespace LaunchDarkly.Sdk.Server.Internal.DataSources
88
{
99
/// <summary>
1010
/// A composite source is a source that can dynamically switch between sources with the
11-
/// help of a list of <see cref="ISourceFactory"/> instances and <see cref="IActionApplierFactory"/> instances.
12-
/// The ISourceFactory instances are used to create the data sources, and the IActionApplierFactory creates the action appliers that are used
11+
/// help of a list of <see cref="SourceFactory"/> delegates and <see cref="ActionApplierFactory"/> delegates.
12+
/// The SourceFactory delegates are used to create the data sources, and the ActionApplierFactory creates the action appliers that are used
1313
/// to apply actions to the composite source as updates are received from the data sources.
1414
/// </summary>
1515
internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable
@@ -25,13 +25,13 @@ internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable
2525

2626
private readonly IDataSourceUpdates _originalUpdateSink;
2727
private readonly IDataSourceUpdates _sanitizedUpdateSink;
28-
private readonly SourcesList<(ISourceFactory Factory, IActionApplierFactory ActionApplierFactory)> _sourcesList;
28+
private readonly SourcesList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> _sourcesList;
2929
private readonly DisableableDataSourceUpdatesTracker _disableableTracker;
3030

3131
// Tracks the entry from the sources list that was used to create the current
3232
// data source instance. This allows operations such as blacklist to remove
3333
// the correct factory/action-applier-factory tuple from the list.
34-
private (ISourceFactory Factory, IActionApplierFactory ActionApplierFactory) _currentEntry;
34+
private (SourceFactory Factory, ActionApplierFactory ActionApplierFactory) _currentEntry;
3535
private IDataSource _currentDataSource;
3636

3737
/// <summary>
@@ -42,7 +42,7 @@ internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable
4242
/// <param name="circular">whether to loop off the end of the list back to the start when fallback occurs</param>
4343
public CompositeSource(
4444
IDataSourceUpdates updatesSink,
45-
IList<(ISourceFactory Factory, IActionApplierFactory ActionApplierFactory)> factoryTuples,
45+
IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> factoryTuples,
4646
bool circular = true)
4747
{
4848
if (updatesSink is null)
@@ -60,7 +60,7 @@ public CompositeSource(
6060
// this tracker is used to disconnect the current source from the updates sink when it is no longer needed.
6161
_disableableTracker = new DisableableDataSourceUpdatesTracker();
6262

63-
_sourcesList = new SourcesList<(ISourceFactory SourceFactory, IActionApplierFactory ActionApplierFactory)>(
63+
_sourcesList = new SourcesList<(SourceFactory SourceFactory, ActionApplierFactory ActionApplierFactory)>(
6464
circular: circular,
6565
initialList: factoryTuples
6666
);
@@ -86,6 +86,11 @@ public Task<bool> Start()
8686
/// Disposes of the composite data source.
8787
/// </summary>
8888
public void Dispose()
89+
{
90+
InternalDispose();
91+
}
92+
93+
private void InternalDispose(DataSourceStatus.ErrorInfo? error = null)
8994
{
9095
// When disposing the whole composite, we bypass the action queue and tear
9196
// down the current data source immediately while still honoring the same
@@ -104,23 +109,23 @@ public void Dispose()
104109
// clear any queued actions and reset processing state
105110
_pendingActions.Clear();
106111
_isProcessingActions = false;
107-
_sourcesList.Reset();
108112
_currentEntry = default;
109-
110113
_disposed = true;
111114
}
112115

113116
// report state Off directly to the original sink, bypassing the sanitizer
114117
// which would map Off to Interrupted (that mapping is only for underlying sources)
115-
_originalUpdateSink.UpdateStatus(DataSourceState.Off, null);
118+
_originalUpdateSink.UpdateStatus(DataSourceState.Off, error);
116119
}
117120

118121
/// <summary>
119122
/// Enqueue a state-changing operation to be executed under the shared lock.
120-
/// If no other operation is currently running, this will synchronously process
121-
/// the queue in a simple loop on the current thread. Any re-entrant calls from
122-
/// within the operations will only enqueue more work; they will not trigger
123-
/// another processing loop, so the call stack does not grow with the queue length.
123+
/// If no other operation is currently running, this will asynchronously process
124+
/// the queue on a background thread. Any re-entrant calls from within the operations
125+
/// will only enqueue more work; they will not trigger another processing loop, so
126+
/// the call stack does not grow with the queue length. Processing actions on a
127+
/// background thread prevents blocking the calling thread and allows operations like
128+
/// disposal to proceed even when actions are continuously being enqueued.
124129
/// </summary>
125130
private void EnqueueAction(Action action)
126131
{
@@ -137,12 +142,14 @@ private void EnqueueAction(Action action)
137142

138143
if (shouldProcess)
139144
{
140-
ProcessQueuedActions();
145+
// Process actions on a background thread to prevent blocking the caller
146+
// and allow Start() to return even when actions are continuously enqueued
147+
_ = Task.Run(() => ProcessQueuedActions());
141148
}
142149
}
143150

144151
/// <summary>
145-
/// Processes the queued actions.
152+
/// Processes the queued actions on a background thread.
146153
/// </summary>
147154
private void ProcessQueuedActions()
148155
{
@@ -151,7 +158,8 @@ private void ProcessQueuedActions()
151158
Action action;
152159
lock (_lock)
153160
{
154-
if (_pendingActions.Count == 0)
161+
// Check if disposed to allow disposal to interrupt action processing
162+
if (_disposed || _pendingActions.Count == 0)
155163
{
156164
_isProcessingActions = false;
157165
return;
@@ -177,9 +185,9 @@ private void ProcessQueuedActions()
177185
}
178186
}
179187

188+
// This method must only be called while holding _lock.
180189
private void TryFindNextUnderLock()
181190
{
182-
// This method must only be called while holding _lock.
183191
if (_currentDataSource != null)
184192
{
185193
return;
@@ -188,14 +196,22 @@ private void TryFindNextUnderLock()
188196
var entry = _sourcesList.Next();
189197
if (entry.Factory == null)
190198
{
199+
// Failed to find a next source, report error and shut down the composite source
200+
var errorInfo = new DataSourceStatus.ErrorInfo
201+
{
202+
Kind = DataSourceStatus.ErrorKind.Unknown,
203+
Message = "CompositeDataSource has exhausted all available sources.",
204+
Time = DateTime.Now
205+
};
206+
InternalDispose(errorInfo);
191207
return;
192208
}
193209

194210
// Build the list of update sinks conditionally based on whether we have an action applier factory
195211
var updateSinks = new List<IDataSourceUpdates>();
196212
if (entry.ActionApplierFactory != null)
197213
{
198-
var actionApplier = entry.ActionApplierFactory.CreateActionApplier(this);
214+
var actionApplier = entry.ActionApplierFactory(this);
199215
updateSinks.Add(actionApplier);
200216
}
201217
updateSinks.Add(_sanitizedUpdateSink);
@@ -205,7 +221,7 @@ private void TryFindNextUnderLock()
205221
var disableableUpdates = _disableableTracker.WrapAndTrack(fanout);
206222

207223
_currentEntry = entry;
208-
_currentDataSource = entry.Factory.CreateSource(disableableUpdates);
224+
_currentDataSource = entry.Factory(disableableUpdates);
209225
}
210226

211227
#region ICompositeSourceActionable

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/IActionApplierFactory.cs

Lines changed: 0 additions & 15 deletions
This file was deleted.

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ISourceFactory.cs

Lines changed: 0 additions & 21 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using LaunchDarkly.Sdk.Server.Subsystems;
2+
3+
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
4+
{
5+
/// <summary>
6+
/// A function type that creates a new <see cref="IDataSource"/> instance.
7+
/// </summary>
8+
/// <param name="updatesSink">the updates sink for the new source</param>
9+
/// <returns>a new <see cref="IDataSource"/> instance</returns>
10+
internal delegate IDataSource SourceFactory(IDataSourceUpdates updatesSink);
11+
}
12+

0 commit comments

Comments
 (0)