Skip to content

Commit 3fa0a29

Browse files
chore: Add initialization tracking for FDv2 Data Source. (#205)
<!-- CURSOR_SUMMARY --> > [!NOTE] > Introduces initialization tracking for FDv2 with observer-driven composites, wraps the data source to complete on initialization, and updates polling/unrecoverable error behavior and tests. > > - **FDv2 core**: > - Add `InitializationTracker`, `InitializationObserver`, and `CompositeObserver` to aggregate signals from initializers/synchronizers and determine init completion. > - Return a new wrapper `CompletingDataSource` from `FDv2DataSource.CreateFDv2DataSource` that completes `Start()` based on aggregated initialization rather than inner source start. > - Refactor `FDv2DataSource` into `Internal/FDv2DataSources` namespace; replace direct applier wiring with observer composition; add `DataSourceCategory` classification. > - **Composite/Composition**: > - Integrate observers into `CompositeSource` usage; propagate status through `ObservableDataSourceUpdates`; continue to shut down with `Off` and exhaustion error when sources are depleted. > - **Polling data source**: > - On unrecoverable HTTP errors (e.g., 401), set init task result to `false` and shut down; continue to mark `Valid` on 304 responses. > - **Data system**: > - Wire `FDv2DataSystem` to new `Internal.FDv2DataSources` types. > - **Tests/contract**: > - Update unit tests to expect `Start()` result `false` on unrecoverable errors and `true` when no sources; adjust expectations for initialization/fallback flows. > - Contract test suppressions trimmed; keep `streaming/fdv2/disconnects on goodbye`. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit bba9848. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Todd Anderson <[email protected]> Co-authored-by: Todd Anderson <[email protected]>
1 parent ac41228 commit 3fa0a29

File tree

10 files changed

+461
-39
lines changed

10 files changed

+461
-39
lines changed
Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1 @@
1-
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 401
2-
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 403
3-
streaming/retry behavior/do not retry after unrecoverable HTTP error on initial connect/error 405
4-
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 401
5-
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 403
6-
streaming/retry behavior/do not retry after unrecoverable HTTP error on reconnect/error 405
7-
streaming/fdv2/reconnection state management/initializes from 2 polling initializers
81
streaming/fdv2/disconnects on goodbye
9-
streaming/fdv2/reconnection state management/initializes from polling initializer

pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
using LaunchDarkly.Sdk.Server.Interfaces;
55
using LaunchDarkly.Sdk.Server.Subsystems;
66

7-
using static LaunchDarkly.Sdk.Server.Subsystems.DataStoreTypes;
8-
97
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
108
{
119
/// <summary>
@@ -196,6 +194,7 @@ private void TryFindNextUnderLock()
196194
}
197195

198196
var entry = _sourcesList.Next();
197+
199198
if (entry.Factory == null)
200199
{
201200
// Failed to find a next source, report error and shut down the composite source

pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using LaunchDarkly.Sdk.Server.Interfaces;
77
using LaunchDarkly.Sdk.Server.Internal.DataSources;
88
using LaunchDarkly.Sdk.Server.Internal.DataStores;
9+
using LaunchDarkly.Sdk.Server.Internal.FDv2DataSources;
910
using LaunchDarkly.Sdk.Server.Subsystems;
1011

1112
namespace LaunchDarkly.Sdk.Server.Internal.DataSystem
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using System.Threading.Tasks;
2+
using LaunchDarkly.Sdk.Server.Subsystems;
3+
4+
namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
5+
{
6+
internal static partial class FDv2DataSource
7+
{
8+
/// <summary>
9+
/// This class wraps an underlying composite data source abstracting the handling of initialization completion.
10+
/// </summary>
11+
private class CompletingDataSource : IDataSource
12+
{
13+
private readonly IDataSource _inner;
14+
private readonly InitializationTracker _tracker;
15+
16+
public CompletingDataSource(IDataSource inner, InitializationTracker tracker)
17+
{
18+
_inner = inner;
19+
_tracker = tracker;
20+
}
21+
22+
public void Dispose()
23+
{
24+
_inner.Dispose();
25+
_tracker.Dispose();
26+
}
27+
28+
public Task<bool> Start()
29+
{
30+
_ = _inner.Start();
31+
return _tracker.Task;
32+
}
33+
34+
public bool Initialized => _tracker.Task.IsCompleted && _tracker.Task.Result;
35+
}
36+
}
37+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System;
2+
using LaunchDarkly.Sdk.Server.Interfaces;
3+
using LaunchDarkly.Sdk.Server.Internal.DataSources;
4+
using LaunchDarkly.Sdk.Server.Subsystems;
5+
6+
namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
7+
{
8+
internal static partial class FDv2DataSource
9+
{
10+
/// <summary>
11+
/// Combines multiple data source observers into a single observer.
12+
/// </summary>
13+
private class CompositeObserver : IDataSourceObserver
14+
{
15+
private readonly IDataSourceObserver[] _observers;
16+
17+
public CompositeObserver(params IDataSourceObserver[] observers)
18+
{
19+
_observers = observers ?? throw new ArgumentNullException(nameof(observers));
20+
}
21+
22+
public void Apply(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet)
23+
{
24+
foreach (var dataSourceObserver in _observers)
25+
{
26+
dataSourceObserver.Apply(changeSet);
27+
}
28+
}
29+
30+
public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError)
31+
{
32+
foreach (var dataSourceObserver in _observers)
33+
{
34+
dataSourceObserver.UpdateStatus(newState, newError);
35+
}
36+
}
37+
}
38+
}
39+
}
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using LaunchDarkly.Sdk.Server.Interfaces;
4+
using LaunchDarkly.Sdk.Server.Subsystems;
5+
6+
namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
7+
{
8+
internal static partial class FDv2DataSource
9+
{
10+
/// <summary>
11+
/// Ingests observations from several composite data sources and combines the observed state into an
12+
/// initialization state.
13+
/// <para>
14+
/// This tracker uses a strategy where it attempts to get the best possible data. We prioritize getting data
15+
/// that includes a selector over data which does not. If no data with a selector is available, but data
16+
/// without a selector is available, and we have exhausted all initializers, then we will consider ourselves
17+
/// initialized.
18+
/// </para>
19+
/// <para>
20+
/// In the case where there isn't any potential to get data to initialize, for instance, no initializers and
21+
/// no synchronizers, then we assume we are initialized. (offline or daemon mode).
22+
/// </para>
23+
/// </summary>
24+
private class InitializationTracker : IDisposable
25+
{
26+
private readonly TaskCompletionSource<bool> _taskCompletionSource = new TaskCompletionSource<bool>();
27+
private State _state = State.NoData;
28+
private readonly object _stateLock = new object();
29+
30+
private bool _initializersRemain;
31+
private bool _synchronizersRemain;
32+
33+
private enum State
34+
{
35+
/// <summary>
36+
/// The tracker has not received any data.
37+
/// </summary>
38+
NoData,
39+
40+
/// <summary>
41+
/// The tracker has received any data.
42+
/// </summary>
43+
Data,
44+
45+
/// <summary>
46+
/// The tracker has been informed that initializers are exhausted.
47+
/// </summary>
48+
InitializersExhausted,
49+
50+
/// <summary>
51+
/// State entered when we have been informed that we are falling back to FDv1.
52+
/// </summary>
53+
FallingBack,
54+
55+
/// <summary>
56+
/// The tracker is initialized and is no longer processing updates.
57+
/// </summary>
58+
Initialized,
59+
60+
/// <summary>
61+
/// The tracker has encountered a total failure.
62+
/// </summary>
63+
Failed,
64+
}
65+
66+
private enum Action
67+
{
68+
/// <summary>
69+
/// We have received some data.
70+
/// </summary>
71+
DataReceived,
72+
73+
/// <summary>
74+
/// We have received signals that indicate initializers are exhausted.
75+
/// </summary>
76+
InitializersExhausted,
77+
78+
/// <summary>
79+
/// We have received signals that indicate synchronizers are exhausted.
80+
/// </summary>
81+
SynchronizersExhausted,
82+
83+
/// <summary>
84+
/// We have received signals that indicate all fallbacks are exhausted.
85+
/// </summary>
86+
FallbackSynchronizersExhausted,
87+
88+
/// <summary>
89+
/// We have received a selector.
90+
/// </summary>
91+
SelectorReceived,
92+
93+
/// <summary>
94+
/// We are attempting to fallback to FDv1.
95+
/// </summary>
96+
FallingBack,
97+
}
98+
99+
public InitializationTracker(bool hasInitializers, bool hasSynchronizers)
100+
{
101+
if (!(hasInitializers || hasSynchronizers))
102+
{
103+
// If we have no data sources, then we are immediately initialized.
104+
_state = State.Initialized;
105+
_taskCompletionSource.TrySetResult(true);
106+
return;
107+
}
108+
109+
_initializersRemain = hasInitializers;
110+
_synchronizersRemain = hasSynchronizers;
111+
112+
if (!hasInitializers)
113+
{
114+
DetermineState(Action.InitializersExhausted);
115+
}
116+
}
117+
118+
public Task<bool> Task => _taskCompletionSource.Task;
119+
120+
private void HandleRemainingSources()
121+
{
122+
// We only consider fallback if we have transitioned to fallback.
123+
if (!_initializersRemain && !_synchronizersRemain)
124+
{
125+
_state = State.Failed;
126+
}
127+
}
128+
129+
private void DetermineState(Action action)
130+
{
131+
lock (_stateLock)
132+
{
133+
switch (_state)
134+
{
135+
// Terminal states, ignore subsequent actions.
136+
case State.Initialized:
137+
case State.Failed:
138+
break;
139+
case State.NoData:
140+
switch (action)
141+
{
142+
case Action.DataReceived:
143+
_state = State.Data;
144+
break;
145+
case Action.InitializersExhausted:
146+
_initializersRemain = false;
147+
_state = State.InitializersExhausted;
148+
HandleRemainingSources();
149+
break;
150+
case Action.SelectorReceived:
151+
_state = State.Initialized;
152+
break;
153+
case Action.SynchronizersExhausted:
154+
_synchronizersRemain = false;
155+
HandleRemainingSources();
156+
break;
157+
case Action.FallbackSynchronizersExhausted:
158+
// This would indicate that we skipped going through the fallback
159+
// process. So this generally shouldn't be an achievable state.
160+
_state = State.Failed;
161+
break;
162+
}
163+
164+
break;
165+
case State.Data:
166+
switch (action)
167+
{
168+
case Action.DataReceived:
169+
break;
170+
case Action.SynchronizersExhausted:
171+
case Action.FallbackSynchronizersExhausted:
172+
case Action.InitializersExhausted:
173+
case Action.SelectorReceived:
174+
case Action.FallingBack:
175+
_state = State.Initialized;
176+
break;
177+
}
178+
179+
break;
180+
case State.InitializersExhausted:
181+
switch (action)
182+
{
183+
case Action.InitializersExhausted:
184+
break;
185+
case Action.SynchronizersExhausted:
186+
_synchronizersRemain = false;
187+
HandleRemainingSources();
188+
break;
189+
case Action.FallbackSynchronizersExhausted:
190+
// This would indicate that we skipped going through the fallback
191+
// process. So this generally shouldn't be an achievable state.
192+
_state = State.Failed;
193+
break;
194+
195+
case Action.FallingBack:
196+
_state = State.FallingBack;
197+
break;
198+
case Action.DataReceived:
199+
case Action.SelectorReceived:
200+
_state = State.Initialized;
201+
break;
202+
}
203+
204+
break;
205+
case State.FallingBack:
206+
switch (action)
207+
{
208+
case Action.FallingBack:
209+
case Action.InitializersExhausted:
210+
case Action.SynchronizersExhausted:
211+
break;
212+
case Action.FallbackSynchronizersExhausted:
213+
_state = State.Failed;
214+
break;
215+
case Action.DataReceived:
216+
case Action.SelectorReceived:
217+
_state = State.Initialized;
218+
break;
219+
}
220+
221+
break;
222+
}
223+
224+
// After updating the state determine if we need to complete the task.
225+
switch (_state)
226+
{
227+
case State.Initialized:
228+
_taskCompletionSource.TrySetResult(true);
229+
break;
230+
case State.Failed:
231+
_taskCompletionSource.TrySetResult(false);
232+
break;
233+
}
234+
}
235+
}
236+
237+
public void Apply(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet,
238+
DataSourceCategory category)
239+
{
240+
if (!changeSet.Selector.IsEmpty) DetermineState(Action.SelectorReceived);
241+
242+
DetermineState(Action.DataReceived);
243+
}
244+
245+
public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError,
246+
DataSourceCategory category)
247+
{
248+
switch (category)
249+
{
250+
case DataSourceCategory.Initializers when newState == DataSourceState.Off:
251+
{
252+
DetermineState(Action.InitializersExhausted);
253+
break;
254+
}
255+
case DataSourceCategory.Synchronizers when newState == DataSourceState.Off:
256+
{
257+
DetermineState(Action.SynchronizersExhausted);
258+
break;
259+
}
260+
case DataSourceCategory.FallbackSynchronizers when newState == DataSourceState.Off:
261+
{
262+
DetermineState(Action.FallbackSynchronizersExhausted);
263+
break;
264+
}
265+
case DataSourceCategory.FallbackSynchronizers when newState == DataSourceState.Initializing:
266+
{
267+
DetermineState(Action.FallingBack);
268+
break;
269+
}
270+
}
271+
}
272+
273+
public void Dispose()
274+
{
275+
_taskCompletionSource.TrySetResult(false);
276+
}
277+
}
278+
}
279+
}

0 commit comments

Comments
 (0)