Skip to content

Commit 5fe3ca7

Browse files
authored
chore: updates datasources to report recoverability in errors (#208)
**Requirements** - [x] I have added test coverage for new or changed functionality - [x] I have followed the repository's [pull request submission guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests) - [ ] I have validated my changes against all supported platform versions Didn't do yet, going to do as part of overall benchtesting before release. Adds tracking of Recover-ability on data source ErrorInfo so that can be communicated outward by datasources. Then the FDv2 datasource can blacklist conditionally on the recover-ability of the error. This now allows data sources to report Off (intentional shutdown) without getting blacklisted. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Add `Recoverable` to `DataSourceStatus.ErrorInfo` and propagate it across all data sources to drive `Interrupted` vs `Off`, conditional FDv2 blacklisting/fallback, and improved shutdown handling; update tests accordingly. > > - **Interfaces** > - Add `bool Recoverable` to `DataSourceStatus.ErrorInfo` and include it in `FromException`/`FromHttpError`. > - **Data Sources** > - **Polling/Streaming (FDv1 & FDv2)**: > - Compute recoverability via `HttpErrors.IsRecoverable` and populate `ErrorInfo` (set `Recoverable=true` for `InvalidData`/`StoreError`). > - Route status: `Interrupted` for recoverable, `Off` for unrecoverable; pass `ErrorInfo` to status updates. > - Add `Shutdown(ErrorInfo?)` with atomic guard; update `Dispose` to call shutdown and avoid duplicate work; ensure `Off` status on shutdown. > - For unrecoverable init errors, complete init with `false`. > - **FDv2**: > - Honor `FDv1Fallback` header; blacklist current synchronizer only when error is unrecoverable. > - **Tests** > - Update/extend tests to assert `Recoverable` semantics, `FDv1Fallback`, status transitions (`Interrupted` vs `Off`), init task results, and shutdown behavior; add waits for async status updates. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit c56e407. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 3b7e68e commit 5fe3ca7

15 files changed

+404
-89
lines changed

pkgs/sdk/server/src/Interfaces/DataSourceStatus.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public struct ErrorInfo
6262
/// </summary>
6363
public ErrorKind Kind { get; set; }
6464

65+
/// <summary>
66+
/// Whether the error is recoverable. Recoverable errors are those that can be retried, such as network errors. Unrecoverable
67+
/// errors are those that cannot be retried, such as invalid SDK key errors.
68+
/// </summary>
69+
public bool Recoverable { get; set; }
70+
6571
/// <summary>
6672
/// The HTTP status code if the error was <see cref="ErrorKind.ErrorResponse"/>, or zero otherwise.
6773
/// </summary>
@@ -90,24 +96,28 @@ public struct ErrorInfo
9096
/// Constructs an instance based on an exception.
9197
/// </summary>
9298
/// <param name="e">the exception</param>
99+
/// <param name="recoverable">whether the error is recoverable</param>
93100
/// <returns>an ErrorInfo</returns>
94-
public static ErrorInfo FromException(Exception e) => new ErrorInfo
101+
public static ErrorInfo FromException(Exception e, bool recoverable) => new ErrorInfo
95102
{
96103
Kind = e is IOException ? ErrorKind.NetworkError : ErrorKind.Unknown,
97104
Message = e.Message,
98-
Time = DateTime.Now
105+
Time = DateTime.Now,
106+
Recoverable = recoverable
99107
};
100108

101109
/// <summary>
102110
/// Constructs an instance based on an HTTP error status.
103111
/// </summary>
104112
/// <param name="statusCode">the status code</param>
113+
/// <param name="recoverable">whether the error is recoverable</param>
105114
/// <returns>an ErrorInfo</returns>
106-
public static ErrorInfo FromHttpError(int statusCode) => new ErrorInfo
115+
public static ErrorInfo FromHttpError(int statusCode, bool recoverable) => new ErrorInfo
107116
{
108117
Kind = ErrorKind.ErrorResponse,
109118
StatusCode = statusCode,
110-
Time = DateTime.Now
119+
Time = DateTime.Now,
120+
Recoverable = recoverable
111121
};
112122

113123
/// <inheritdoc/>

pkgs/sdk/server/src/Internal/DataSources/PollingDataSource.cs

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ internal sealed class PollingDataSource : IDataSource
2323
private readonly Logger _log;
2424
private CancellationTokenSource _canceller;
2525

26+
private bool _disposed = false;
27+
private readonly AtomicBoolean _shuttingDown = new AtomicBoolean(false);
28+
2629
internal PollingDataSource(
2730
LdClientContext context,
2831
IFeatureRequestor featureRequestor,
@@ -81,62 +84,80 @@ private async Task UpdateTaskAsync()
8184
}
8285
catch (UnsuccessfulResponseException ex)
8386
{
84-
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode);
87+
var recoverable = HttpErrors.IsRecoverable(ex.StatusCode);
88+
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode, recoverable);
8589

86-
if (HttpErrors.IsRecoverable(ex.StatusCode))
90+
if (errorInfo.Recoverable)
8791
{
8892
_log.Warn(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", "will retry"));
8993
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
9094
}
9195
else
9296
{
9397
_log.Error(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", ""));
94-
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
9598
try
9699
{
97100
// if client is initializing, make it stop waiting
98-
_initTask.SetResult(true);
101+
_initTask.SetResult(false);
99102
}
100103
catch (InvalidOperationException)
101104
{
102105
// the task was already set - nothing more to do
103106
}
104-
((IDisposable)this).Dispose();
107+
Shutdown(errorInfo);
105108
}
106109
}
107110
catch (JsonException ex)
108111
{
109112
_log.Error("Polling request received malformed data: {0}", LogValues.ExceptionSummary(ex));
110-
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
111-
new DataSourceStatus.ErrorInfo
112-
{
113-
Kind = DataSourceStatus.ErrorKind.InvalidData,
114-
Time = DateTime.Now
115-
});
113+
var errorInfo = new DataSourceStatus.ErrorInfo
114+
{
115+
Kind = DataSourceStatus.ErrorKind.InvalidData,
116+
Message = ex.Message,
117+
Time = DateTime.Now,
118+
Recoverable = true
119+
};
120+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
116121
}
117122
catch (Exception ex)
118123
{
119124
Exception realEx = (ex is AggregateException ae) ? ae.Flatten() : ex;
120125
_log.Warn("Polling for feature flag updates failed: {0}", LogValues.ExceptionSummary(ex));
121126
_log.Debug(LogValues.ExceptionTrace(ex));
122-
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
123-
DataSourceStatus.ErrorInfo.FromException(realEx));
127+
var errorInfo = DataSourceStatus.ErrorInfo.FromException(realEx, true); // default to recoverable
128+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
124129
}
125130
}
126131

127132
void IDisposable.Dispose()
128133
{
134+
// dispose is currently overloaded with shutdown responsibility, we handle this first
135+
Shutdown(null);
136+
129137
Dispose(true);
130138
GC.SuppressFinalize(this);
131139
}
132140

133141
private void Dispose(bool disposing)
134142
{
135-
if (disposing)
136-
{
137-
_canceller?.Cancel();
143+
if (_disposed) return;
144+
145+
if (disposing) {
146+
// dispose managed resources if any
138147
_featureRequestor.Dispose();
139148
}
149+
150+
_disposed = true;
151+
}
152+
153+
private void Shutdown(DataSourceStatus.ErrorInfo? errorInfo)
154+
{
155+
// Prevent concurrent shutdown calls - only allow the first call to proceed
156+
// GetAndSet returns the OLD value, so if it was already true, we return early
157+
if (_shuttingDown.GetAndSet(true)) return;
158+
159+
_canceller?.Cancel();
160+
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
140161
}
141162

142163
private bool InitWithHeaders(DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData,

pkgs/sdk/server/src/Internal/DataSources/StreamingDataSource.cs

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ internal class StreamingDataSource : IDataSource
5252

5353
private IEnumerable<KeyValuePair<string, IEnumerable<string>>> _headers;
5454

55+
private bool _disposed = false;
56+
private readonly AtomicBoolean _shuttingDown = new AtomicBoolean(false);
57+
5558
internal delegate IEventSource EventSourceCreator(Uri streamUri,
5659
HttpConfiguration httpConfig);
5760

@@ -104,20 +107,36 @@ public Task<bool> Start()
104107

105108
public void Dispose()
106109
{
110+
// dispose is currently overloaded with shutdown responsibility, we handle this first
111+
Shutdown(null);
112+
107113
Dispose(true);
108114
GC.SuppressFinalize(this);
109115
}
110116

111117
private void Dispose(bool disposing)
112118
{
113-
if (disposing)
119+
if (_disposed) return;
120+
121+
if (disposing) {
122+
// dispose managed resources if any
123+
}
124+
125+
_disposed = true;
126+
}
127+
128+
private void Shutdown(DataSourceStatus.ErrorInfo? errorInfo)
129+
{
130+
// Prevent concurrent shutdown calls - only allow the first call to proceed
131+
// GetAndSet returns the OLD value, so if it was already true, we return early
132+
if (_shuttingDown.GetAndSet(true)) return;
133+
134+
_es.Close();
135+
if (_storeStatusMonitoringEnabled)
114136
{
115-
_es.Close();
116-
if (_storeStatusMonitoringEnabled)
117-
{
118-
_dataSourceUpdates.DataStoreStatusProvider.StatusChanged -= OnDataStoreStatusChanged;
119-
}
137+
_dataSourceUpdates.DataStoreStatusProvider.StatusChanged -= OnDataStoreStatusChanged;
120138
}
139+
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
121140
}
122141

123142
#endregion
@@ -175,7 +194,8 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
175194
{
176195
Kind = DataSourceStatus.ErrorKind.InvalidData,
177196
Message = ex.Message,
178-
Time = DateTime.Now
197+
Time = DateTime.Now,
198+
Recoverable = true
179199
};
180200
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
181201

@@ -187,7 +207,8 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
187207
{
188208
Kind = DataSourceStatus.ErrorKind.StoreError,
189209
Message = (ex.InnerException ?? ex).Message,
190-
Time = DateTime.Now
210+
Time = DateTime.Now,
211+
Recoverable = true
191212
};
192213
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
193214
if (!_storeStatusMonitoringEnabled)
@@ -210,17 +231,16 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
210231
private void OnError(object sender, EventSource.ExceptionEventArgs e)
211232
{
212233
var ex = e.Exception;
213-
var recoverable = true;
214234
DataSourceStatus.ErrorInfo errorInfo;
215235

216236
if (ex is EventSourceServiceUnsuccessfulResponseException respEx)
217237
{
218238
int status = respEx.StatusCode;
219-
errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(status);
239+
var recoverable = HttpErrors.IsRecoverable(status);
240+
errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(status, recoverable);
220241
RecordStreamInit(true);
221-
if (!HttpErrors.IsRecoverable(status))
242+
if (!recoverable)
222243
{
223-
recoverable = false;
224244
_log.Error(HttpErrors.ErrorMessage(status, "streaming connection", ""));
225245
}
226246
else
@@ -230,21 +250,23 @@ private void OnError(object sender, EventSource.ExceptionEventArgs e)
230250
}
231251
else
232252
{
233-
errorInfo = DataSourceStatus.ErrorInfo.FromException(ex);
253+
errorInfo = DataSourceStatus.ErrorInfo.FromException(ex, true); // default to recoverable
234254
_log.Warn("Encountered EventSource error: {0}", LogValues.ExceptionSummary(ex));
235255
_log.Debug(LogValues.ExceptionTrace(ex));
236256
}
237257

238-
_dataSourceUpdates.UpdateStatus(recoverable ? DataSourceState.Interrupted : DataSourceState.Off,
239-
errorInfo);
240-
241-
if (!recoverable)
258+
if (errorInfo.Recoverable)
259+
{
260+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
261+
return;
262+
}
263+
else
242264
{
243265
// Make _initTask complete to tell the client to stop waiting for initialization. We use
244266
// TrySetResult rather than SetResult here because it might have already been completed
245267
// (if for instance the stream started successfully, then restarted and got a 401).
246268
_initTask.TrySetResult(false);
247-
((IDisposable)this).Dispose();
269+
Shutdown(errorInfo);
248270
}
249271
}
250272

pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? n
194194
// When a synchronizer reports it is off, fall back immediately
195195
if (newState == DataSourceState.Off)
196196
{
197-
_actionable.BlacklistCurrent();
197+
if (newError != null && !newError.Value.Recoverable)
198+
{
199+
_actionable.BlacklistCurrent();
200+
}
198201
_actionable.DisposeCurrent();
199202
_actionable.GoToNext();
200203
_actionable.StartCurrent();

0 commit comments

Comments
 (0)